diff --git a/app/queues/fetch-messages.server.ts b/app/queues/fetch-messages.server.ts index f4c6dbe..1686e13 100644 --- a/app/queues/fetch-messages.server.ts +++ b/app/queues/fetch-messages.server.ts @@ -10,6 +10,7 @@ type Payload = { export default Queue("fetch messages", async ({ data }) => { const { phoneNumberId } = data; + logger.info(`Fetching messages for phone number with id=${phoneNumberId}`); const phoneNumber = await db.phoneNumber.findUnique({ where: { id: phoneNumberId }, include: { twilioAccount: true }, @@ -26,8 +27,11 @@ export default Queue("fetch messages", async ({ data }) => { ]); const messagesSent = sent.filter((message) => message.direction.startsWith("outbound")); const messagesReceived = received.filter((message) => message.direction === "inbound"); - const messages = [...messagesSent, ...messagesReceived]; + logger.info( + `Found ${messagesSent.length} outbound messages and ${messagesReceived.length} inbound messages for phone number with id=${phoneNumberId}`, + ); + const messages = [...messagesSent, ...messagesReceived]; await insertMessagesQueue.add(`insert messages of id=${phoneNumberId}`, { phoneNumberId, messages, diff --git a/app/queues/fetch-phone-calls.server.ts b/app/queues/fetch-phone-calls.server.ts index 28c8e22..f5be9b8 100644 --- a/app/queues/fetch-phone-calls.server.ts +++ b/app/queues/fetch-phone-calls.server.ts @@ -10,6 +10,7 @@ type Payload = { export default Queue("fetch phone calls", async ({ data }) => { const { phoneNumberId } = data; + logger.info(`Fetching phone calls for phone number with id=${phoneNumberId}`); const phoneNumber = await db.phoneNumber.findUnique({ where: { id: phoneNumberId }, include: { twilioAccount: true }, @@ -24,8 +25,11 @@ export default Queue("fetch phone calls", async ({ data }) => { twilioClient.calls.list({ from: phoneNumber.number }), twilioClient.calls.list({ to: phoneNumber.number }), ]); - const calls = [...callsSent, ...callsReceived]; + logger.info( + `Found ${callsSent.length} outbound calls and ${callsReceived.length} inbound calls for phone number with id=${phoneNumberId}`, + ); + const calls = [...callsSent, ...callsReceived]; await insertCallsQueue.add(`insert calls of id=${phoneNumberId}`, { phoneNumberId, calls, diff --git a/app/queues/insert-messages.server.ts b/app/queues/insert-messages.server.ts index 0b2129b..526c9be 100644 --- a/app/queues/insert-messages.server.ts +++ b/app/queues/insert-messages.server.ts @@ -13,8 +13,10 @@ type Payload = { export default Queue("insert messages", async ({ data }) => { const { messages, phoneNumberId } = data; + logger.info(`Inserting ${messages.length} messages for phone number with id=${phoneNumberId}`); const phoneNumber = await db.phoneNumber.findUnique({ where: { id: phoneNumberId } }); if (!phoneNumber) { + logger.warn(`No phone number found with id=${phoneNumberId}`); return; } @@ -37,7 +39,7 @@ export default Queue("insert messages", async ({ data }) => { .sort((a, b) => a.sentAt.getTime() - b.sentAt.getTime()); const { count } = await db.message.createMany({ data: sms, skipDuplicates: true }); - logger.info(`inserted ${count} new messages for phoneNumberId=${phoneNumberId}`); + logger.info(`Inserted ${count} new messages for phone number with id=${phoneNumberId}`); if (!phoneNumber.isFetchingMessages) { return; diff --git a/app/queues/insert-phone-calls.server.ts b/app/queues/insert-phone-calls.server.ts index 89a2287..f808b6f 100644 --- a/app/queues/insert-phone-calls.server.ts +++ b/app/queues/insert-phone-calls.server.ts @@ -13,8 +13,10 @@ type Payload = { export default Queue("insert phone calls", async ({ data }) => { const { calls, phoneNumberId } = data; + logger.info(`Inserting ${calls.length} phone calls for phone number with id=${phoneNumberId}`); const phoneNumber = await db.phoneNumber.findUnique({ where: { id: phoneNumberId } }); if (!phoneNumber) { + logger.warn(`No phone number found with id=${phoneNumberId}`); return; } @@ -37,7 +39,7 @@ export default Queue("insert phone calls", async ({ data }) => { .sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime()); const { count } = await db.phoneCall.createMany({ data: phoneCalls, skipDuplicates: true }); - logger.info(`inserted ${count} new phone calls for phoneNumberId=${phoneNumberId}`); + logger.info(`Inserted ${count} new phone calls for phone number with id=${phoneNumberId}`); if (!phoneNumber.isFetchingCalls) { return; diff --git a/app/utils/queue.server.ts b/app/utils/queue.server.ts index 99dbb18..1e290ce 100644 --- a/app/utils/queue.server.ts +++ b/app/utils/queue.server.ts @@ -33,7 +33,52 @@ export function Queue( defaultJobOptions: jobOptions, connection: redis, }); - const worker = new Worker(name, handler, { connection: redis }); + queue.on("error", (error) => logger.error(`queue:${name}:error`, error)); + queue.on("cleaned", (jobs, type) => + logger.debug(`queue:${name}:cleaned`, `${jobs.length} jobs cleaned (type=${type})`), + ); + queue.on("waiting", (job) => logger.debug(`queue:${name}:waiting`, `job "${job.name}" is waiting`)); + queue.on("paused", () => logger.debug(`queue:${name}:paused`)); + queue.on("resumed", () => logger.debug(`queue:${name}:resumed`)); + queue.on("ioredis:close", () => logger.debug(`queue:${name}:ioredis:close`)); + queue.on("removed", (job) => logger.debug(`queue:${name}:removed`, `job "${job.name}" has been removed`)); + queue.on("progress", (job, progress) => + logger.debug(`queue:${name}:progress`, `job "${job.name}" has progressed => ${progress}`), + ); + + const worker = new Worker( + name, + async (job, token) => { + try { + const res = await handler(job, token); + logger.debug(`queue:${name}:worker-success`, `worker finished job ${job.name}`); + return res; + } catch (error) { + logger.error(`queue:${name}:worker-error`, `worker error for job ${job.name}`, error); + return null; + } + }, + { connection: redis }, + ); + worker.on("failed", (job, error, prev) => + logger.error(`job "${job.name}" failed with error ${error} (prev=${prev})`), + ); + worker.on("completed", (job, error, prev) => + logger.debug(`job "${job.name}" completed (error=${error}) (prev=${prev})`), + ); + + worker.on("error", (error) => logger.error(`worker:${name}:error`, error)); + worker.on("paused", () => logger.debug(`worker:${name}:paused`)); + worker.on("resumed", () => logger.debug(`worker:${name}:resumed`)); + worker.on("ioredis:close", () => logger.debug(`worker:${name}:ioredis:close`)); + worker.on("progress", (job, progress) => + logger.debug(`worker:${name}:progress`, `job "${job.name}" has progressed => ${progress}`), + ); + worker.on("active", (job) => logger.debug(`worker:${name}:active job "${job.name}"`)); + worker.on("closed", () => logger.debug(`worker:${name}:closed`)); + worker.on("closing", (msg) => logger.debug(`worker:${name}:closing msg="${msg}"`)); + worker.on("drained", () => logger.debug(`worker:${name}:drained`)); + const scheduler = new QueueScheduler(name, { connection: redis }); registeredQueues.set(name, { queue, worker, scheduler }); @@ -58,7 +103,7 @@ export function CronJob( const queue = Queue(name, handler, jobOptions); queue.add(name, undefined, jobOptions); - logger.info(`registered cron job "${name}"`); + logger.debug(`registered cron job "${name}"`); return queue; }; }