diff --git a/app/queues/index.ts b/app/queues/index.ts index f613a2c..a33ab4a 100644 --- a/app/queues/index.ts +++ b/app/queues/index.ts @@ -3,6 +3,7 @@ import fetchPhoneCallsQueue from "./fetch-phone-calls.server"; import insertPhoneCallsQueue from "./insert-phone-calls.server"; import fetchMessagesQueue from "./fetch-messages.server"; import insertMessagesQueue from "./insert-messages.server"; +import setTwilioWebhooksQueue from "./set-twilio-webhooks.server"; export default [ deleteUserDataQueue, @@ -10,4 +11,5 @@ export default [ insertPhoneCallsQueue, fetchMessagesQueue, insertMessagesQueue, + setTwilioWebhooksQueue, ]; diff --git a/app/queues/insert-incoming-message.server.ts b/app/queues/insert-incoming-message.server.ts new file mode 100644 index 0000000..f32e0c4 --- /dev/null +++ b/app/queues/insert-incoming-message.server.ts @@ -0,0 +1,61 @@ +import type { MessageInstance } from "twilio/lib/rest/api/v2010/account/message"; +import { Direction } from "@prisma/client"; + +import { Queue } from "~/utils/queue.server"; +import db from "~/utils/db.server"; +import getTwilioClient, { translateMessageDirection, translateMessageStatus } from "~/utils/twilio.server"; +import logger from "~/utils/logger.server"; + +type Payload = { + phoneNumberId: string; + messageSid: MessageInstance["sid"]; +}; + +export default Queue("insert incoming message", async ({ data }) => { + const { messageSid, phoneNumberId } = data; + logger.info(`received message ${messageSid} for ${phoneNumberId}`); + const phoneNumber = await db.phoneNumber.findUnique({ + where: { id: phoneNumberId }, + include: { + organization: { + select: { twilioAccount: true }, + }, + }, + }); + if (!phoneNumber) { + logger.warn(`No phone number found with id=${phoneNumberId}`); + return; + } + + const twilioAccount = phoneNumber.organization.twilioAccount; + if (!twilioAccount) { + logger.warn(`Phone number with id=${phoneNumberId} doesn't have a connected twilio account`); + return; + } + + const twilioClient = getTwilioClient(twilioAccount); + const message = await twilioClient.messages.get(messageSid).fetch(); + const status = translateMessageStatus(message.status); + const direction = translateMessageDirection(message.direction); + await db.message.create({ + data: { + phoneNumberId, + id: messageSid, + recipient: direction === Direction.Outbound ? message.to : message.from, + to: message.to, + from: message.from, + status, + direction, + sentAt: new Date(message.dateCreated), + content: message.body, + }, + }); + + /*await notifyIncomingMessageQueue.enqueue( + { + messageSid, + phoneNumberId, + }, + { id: `notify-${messageSid}-${organizationId}-${phoneNumberId}` }, + );*/ +}); diff --git a/app/routes/webhooks/message.ts b/app/routes/webhooks/message.ts new file mode 100644 index 0000000..dc99a64 --- /dev/null +++ b/app/routes/webhooks/message.ts @@ -0,0 +1,103 @@ +import { type ActionFunction } from "@remix-run/node"; +import { badRequest, html, notFound, serverError } from "remix-utils"; +import { Prisma, SubscriptionStatus } from "@prisma/client"; + +import insertIncomingMessageQueue from "~/queues/insert-incoming-message.server"; +import logger from "~/utils/logger.server"; +import db from "~/utils/db.server"; +import twilio from "twilio"; +import { smsUrl } from "~/utils/twilio.server"; +import { decrypt } from "~/utils/encryption"; + +export const action: ActionFunction = async ({ request }) => { + const twilioSignature = request.headers.get("X-Twilio-Signature") || request.headers.get("x-twilio-signature"); + if (!twilioSignature || Array.isArray(twilioSignature)) { + return badRequest("Invalid header X-Twilio-Signature"); + } + + const body: Body = await request.json(); + try { + const phoneNumbers = await db.phoneNumber.findMany({ + where: { number: body.To }, + include: { + organization: { + include: { + subscriptions: { + where: { + OR: [ + { status: { not: SubscriptionStatus.deleted } }, + { + status: SubscriptionStatus.deleted, + cancellationEffectiveDate: { gt: new Date() }, + }, + ], + }, + orderBy: { lastEventTime: Prisma.SortOrder.desc }, + }, + twilioAccount: true, + }, + }, + }, + }); + if (phoneNumbers.length === 0) { + // phone number is not registered by any organization + return notFound("Phone number not found"); + } + + const phoneNumbersWithActiveSub = phoneNumbers.filter( + (phoneNumber) => phoneNumber.organization.subscriptions.length > 0, + ); + if (phoneNumbersWithActiveSub.length === 0) { + // accept the webhook but don't store incoming message + // because the organization is on the free plan + return html(""); + } + + const phoneNumber = phoneNumbersWithActiveSub.find((phoneNumber) => { + // if multiple organizations have the same number + // find the organization currently using that phone number + // maybe we shouldn't let that happen by restricting a phone number to one org? + const encryptedAuthToken = phoneNumber.organization.twilioAccount?.accountAuthToken; + const authToken = encryptedAuthToken ? decrypt(encryptedAuthToken) : ""; + return twilio.validateRequest(authToken, twilioSignature, smsUrl, body); + }); + if (!phoneNumber) { + return badRequest("Invalid webhook"); + } + + const messageSid = body.MessageSid; + const phoneNumberId = phoneNumber.id; + await insertIncomingMessageQueue.add(`insert message ${messageSid} for ${phoneNumberId}`, { + messageSid, + phoneNumberId, + }); + + return html(""); + } catch (error: any) { + logger.error(error); + + return serverError(error.message); + } +}; + +type Body = { + ToCountry: string; + ToState: string; + SmsMessageSid: string; + NumMedia: string; + ToCity: string; + FromZip: string; + SmsSid: string; + FromState: string; + SmsStatus: string; + FromCity: string; + Body: string; + FromCountry: string; + To: string; + ToZip: string; + NumSegments: string; + MessageSid: string; + AccountSid: string; + From: string; + ApiVersion: string; +};