@ Enginy
The problem we solved
Entity workflows in Temporal
From message to conversation
Architecture & the action queue evolution
7 scenarios. LLMs allowed. Good luck.
01
The problem we solved
The message service
message service
(Single Instance)
Cron loop
over ALL conversations
LinkedIn / Email
What happened every cycle
Refresh messages
LinkedIn API call
Check sequence
Is it time to act?
Send message
If appropriate
Next conversation
Repeat
Even conversations with nothing to do still consumed a LinkedIn API call just to check for new messages. With thousands of conversations, most refreshes were wasted.
Each iteration was independent. No rate limit tracking across conversations. Conversation A could burn all the API quota, leaving B delayed for the entire cycle.
When scale made the cron untenable
⏰
Cron cycle took so long that by the time it looped back, messages that should have been sent hours ago were still pending.
💥
If the cron job crashed mid-loop, everything stopped. No partial progress was saved. It had to restart from the top.
📈
Refreshing all conversations for the same identity flooded LinkedIn with API calls. No coordination — rate limits hit hard.
Bottom line: The architecture fundamentally couldn't scale. We didn't need a faster cron — we needed a completely different model.
02
Entity workflows in Temporal
One workflow per conversation, one call manager per identity
conversationWorkflow
1 per conversation
enqueue
actions
callManagerWorkflow
1 per identity
execute
actions
Channels
Scalable: Each conversation is its own durable workflow
Rate-aware: Call manager controls limits per identity
Resilient: Failures are isolated, retried, and recovered
Why the split matters
conversationWorkflowSCOPE
1 per conversation
Lifetime: Days to months. Lives as long as the conversation is active.
callManagerWorkflowSCOPE
1 per identity (shared across all conversations)
Lifetime: On-demand. Starts when actions are queued, exits when empty.
The key insight: Conversations don't talk to LinkedIn/Email directly. They enqueue actions, and the call manager is the single gatekeeper. This is what makes rate limiting and working hours possible — one bottleneck per identity, by design.
03
From message to conversation
Gradual rollout, client by client
Phase 1
Internal clients
Phase 2
Small clients batch
Phase 3
Growing batches
Phase 4
All clients
A flag per client controlled which service handled their conversations. Moved clients batch by batch, monitoring for issues between each phase.
A missing else clause meant some clients fell through the cracks — no messages refreshed or sent for almost a week before anyone noticed.
Lesson: Gradual rollouts catch bugs early — but only if you're monitoring the clients that are not migrated yet, too.
From extremely buggy to extremely resilient
The early days of the conversation service were rough. Temporal gave us durability, but the orchestration logic itself had to be battle-tested.
Multiple conversations signaling the call manager simultaneously. Actions processed out of order or duplicated.
Actions stored in workflow memory or Redis disappeared on restarts. Conversations would hang indefinitely.
Every workflow change required careful patching. Long-lived conversations could run for months — old code paths had to be preserved.
Today: The service has become one of the most resilient parts of the platform. The bugs forced us to build robust error handling, idempotency, and recovery patterns that make it very hard to break.
04
Architecture & the action queue evolution
One per conversation — orchestrates the campaign
From start to completion
Start
Load state
Wait ACTIVE
activeToll
Execute Graph
node by node
Wait for Reply
up to 14 days
Auto-Reply
AI loop
Complete
Campaigns are directed graphs. Nodes are actions (LinkedIn, Email, WhatsApp, Task...). Edges are conditions (time passed, connection accepted, has email...). The workflow traverses the graph node by node.
At any point, if the lead replies, the workflow breaks out of the sequence and transitions to the auto-reply loop (if AI is enabled) or completes.
Legacy: The old initialMessage mode used flat sequences (step 1, step 2...). Deprecated in favor of the conditional graph model. Still supported via patching for old workflows.
Step by step
// 1. Create state holder — loads campaign, messages, config from DB
const conversationState = await ConversationStateHolder.create({
clientId, conversationId, identityId, exportToCrmWorkflowId
})
// 2. Wait until campaign is ACTIVE (could be DRAFT/PENDING for days)
await conversationState.activeToll()
// 3. Race: run conversation flow vs external completion signal
await Promise.race([
conversationFlow(conversationState),
condition(() => conversationState.status === 'COMPLETED'),
])
// If completed externally, in-flight activities are cancelled automatically
Calls getConversationState() activity, parses the campaign graph, registers all signal handlers, and initializes in-memory state.
condition(() => status === 'ACTIVE') — blocks until the workflow is unpaused. Used between every step as a gate.
The main execution path
async function conversationFlow(state: ConversationStateHolder) {
await state.activeToll()
// Execute the campaign graph — races against BREAK_FLOW (lead replied)
const sequencePromise = executeConditionalSequence(state)
await Promise.race([sequencePromise, condition(() => !!state.BREAK_FLOW)])
// Wait for lead to reply (up to 14 days)
const hasNewMessages = await state.waitForNewLeadMessagesOrCompletion()
if (!hasNewMessages) return await state.completeConversation()
// Detect OOO, forwards, bounces
const action = await processLeadMessageType(state)
if (action === 'COMPLETE') return
// AI auto-reply loop (if enabled)
if (state.shouldStartAutoReplyLoop()) await autoReplyLoop(state)
}
BREAK_FLOW: When a lead replies while we're executing the sequence, a signal sets BREAK_FLOW = true. The race resolves, the sequence is cancelled, and we jump to reply handling. This is how we interrupt any step mid-flight.
executeNode → computeNextEdge → loop
Message, Connection, InMail, Attachment, Voice, Bundle
Gmail, Outlook, IMAP
Message
Other
Task, Like Post, Add to Campaign
Sleep
node.time delay
Check Pause
activeToll
executeNode()
send action
computeNextEdge()
evaluate conditions
Next Node
or END
TimePassed ·
AcceptsConnection ·
HasProfessionalEmail ·
HasWhatsAppAccount ·
FieldCondition ·
NotAlreadyContacted · and more.
First matching edge wins.
Graph traversal, node by node
async function conditionalSequenceLoop(state, currentNode: CampaignNode) {
// 1. Sleep if node has a time delay (e.g. "wait 3 days")
if (currentNode.data?.time) await sleep(currentNode.data.time)
// 2. Gate: wait if conversation is paused
await state.activeToll()
// 3. Execute the node action
const nodeOutput = await executeNode(state, currentNode)
// 4. Evaluate outgoing edges to find the next node
const nextEdge = await computeNextEdge(state, currentNode, nodeOutput)
if (!nextEdge) return // END node or no matching edge
// 5. Recurse into the next node
await conditionalSequenceLoop(state, nextEdge.targetNode)
}
computeNextEdge(): Evaluates each outgoing edge's conditions (TimePassed, AcceptsConnection, HasEmail, FieldCondition...). First matching edge wins. This is how campaigns branch.
From campaign node to enqueued action
// Routes to the right state method:
switch (node.type) {
case 'LINKEDIN':
return state.sendLinkedInMessage({
nodeId: node.id
})
case 'EMAIL':
return state.sendEmailMessage({
nodeId: node.id
})
case 'LINKEDIN_CONNECTION':
return state.sendConnectionRequest({
nodeId: node.id
})
// WHATSAPP, TASK, LIKE_POST, ...
}
async sendLinkedInMessage({ nodeId }) {
const input = {
buildFrom: buildMessageFrom(
content, initialMessageId, nodeId
),
lastMessageDbId: this.getLastMessage()?.id
}
// Enqueue + wait for result
return this.performConversationAction({
actionType: SEND_LINKEDIN_MESSAGE,
conversationId: this.conversationId,
data: input,
})
}
buildMessageFrom(): A pointer (nodeId, initialMessageId, or raw content) that tells the call manager how to build the message. The actual AI generation, template rendering, etc. happens on the call manager side.
The bridge to the call manager
async performConversationAction(action) {
while (true) {
// Gate: wait if paused/errored
if (!this.isConversationActive()) await this.activeToll()
// Enqueue in DB + signal call manager + await result
const result = await this.processIdentityAction(action, this.identityId)
// processIdentityAction:
// 1. executeConversationAction() → INSERT into DB + signal callManager
// 2. await condition(() => this.RESULT || this.BREAK_FLOW)
// 3. return this.RESULT (set by the CALL_RESULT signal handler)
if (result.status === 'success') return result.data
if (result.status === 'error') await this.setStatusToErrored(reason)
if (result.status === 'paused') /* loop back, activeToll will block */
}
}
The loop matters: If the call manager finds the conversation is paused, it signals back status: 'paused'. The workflow loops back, hits activeToll(), blocks until unpaused, and retries. This makes the workflow self-healing across pause/resume cycles.
How external events modify workflow state
// Inside ConversationStateHolder.initialize():
setHandler(conversationStateChangedSignal, async (action) => {
if (action.type === 'NEW_MESSAGES') this.BREAK_FLOW = true
if (action.type === 'CONNECTION_REQUEST_ACCEPTED') this.isConnectedOnLinkedIn = true
if (action.type === 'STATUS_CHANGED') await this.computeConversationStatus(...)
if (action.type === 'CAMPAIGN_CHANGED') await this.computeCampaignData()
if (action.type === 'TASK_COMPLETED') this.completedTasks.push(action.taskId)
})
setHandler(callResultSignal, (action) => {
if (action.isManual) this.BREAK_FLOW = true // Manual send → interrupt sequence
else this.RESULT = action // Normal result → unblock await
})
Sent by external services (API, Refresh Service) when something happens: new messages arrive, connection accepted, campaign edited, task completed. Modifies state in-place — condition() calls re-evaluate immediately.
Sent by the call manager after processing an action. Sets this.RESULT which unblocks processIdentityAction()'s condition(). The conversation workflow resumes with the action result.
One per identity — controls rate limits, working hours, and execution
The action processing loop
getNextAction()
from DB queue
Working hours?
sleep if OOH
Rate limit check
delay if needed
processAction()
call LinkedIn/Email/...
Signal result
back to conversation
Before processing any action, checks if the identity is within configured working hours. If not, sleeps until next available window (up to 6 days ahead).
If the same action type was executed < 1 minute ago, adds a delay. Prevents LinkedIn/Email from detecting automation patterns.
Key insight: One call manager per identity means all conversations for that identity share the same rate limits and working hours. No conversation can accidentally burn another's quota.
Step by step
async function workflowFunction({ clientId, identityId }) {
// 1. Register manual action handler (must be before any activity)
setHandler(manualConversationIdentityActionSignal, async (action) => {
const decision = await manager.getActionDecision(action)
if (decision.decision === 'ENQUEUE') return { success: false, message: reason }
const result = await manager.processAction(action)
await manager.replySignal(action, 'success', result)
return { success: true }
})
// 2. Load identity state (local activity — fast, no network)
const identityState = await localGetIdentityState(identityId)
// 3. Unpause queues based on current identity health
const queuesToUnpause = []
if (!identityState.linkedInExpired) queuesToUnpause.push('linkedIn', 'inMail')
for (const email of identityState.emails)
if (!email.emailExpired) queuesToUnpause.push(`email-${email.email}`)
await unpauseMultipleQueues(identityId, queuesToUnpause)
// 4. Create the actions receiver
const manager = new ConversationIdentityActionsReceiver({ identityState })
// 5. Main processing loop (next slide)
}
while (getNextAction) → processAction → replySignal
let action, lastAction
while (action = await manager.getNextActionOrNull({ lastActionType })) {
// Anti-automation delay: same action type within 1 min → sleep(1 min)
if (shouldDelay(action) && lastAction?.type === action.actionType
&& Date.now() - lastAction.timestamp < ONE_MINUTE_MS) {
await sleep(ONE_MINUTE_MS)
}
try {
const result = await manager.processAction(action)
await manager.replySignal(action, 'success', result)
} catch (error) {
// Route to specific handler based on error type:
// LINKEDIN_EXPIRED → handleLinkedInExpiredIdentity(action)
// EMAIL_EXPIRED → handleEmailExpiredIdentity(action, email)
// RATE_LIMIT → handleRateLimitReached(action)
// PROXY_FAILURE → retry up to 10 times
// CONVERSATION_COMPLETED → skip action, continue
// ...20+ error types
}
}
When the loop exits: getNextActionOrNull returns null when no actions are left. The workflow then calls waitForNewIdentityState() or exits. It will be re-started via signalWithStart when new actions arrive.
Working hours + rate limit gate before every action
async getNextActionOrNull({ lastActionType }): Promise<Action | null> {
// 1. Working hours check — sleep until next window if OOH
const waitTime = this.getWaitTimeIfOOH()
if (waitTime) { await condition(workingHoursChanged, waitTime); return recurse() }
// 2. Pull actions from DB one by one
while (action = await getNextAction(identityId, lastActionType)) {
const decision = await this.getActionDecision(action)
if (decision.decision === 'PROCEED') return action // ← ready to execute
// Not ready → move to appropriate queue
if (decision.reason === 'WAIT')
await pauseQueue(identityId, 'actions', decision.waitTimeMs, action.actionType)
await this.enqueueAction(action, decision)
// enqueueAction → addAction(identityId, action, 'linkedIn'|'email-X'|'wait'|'inMail')
}
// 3. Nothing ready — check if anything is parked at all
if (await noActionLeft(identityId)) return null
await this.waitForNewIdentityState(timeout) // sleep until signal or waitUntil
return recurse()
}
Should process? → Actually process
async processAction(action): Promise<any> {
// STEP 1: Should we still process this?
// getConversationStatusAndWaitIfActive() → checks DB
// - COMPLETED/DELETED → throw ConversationCompletedException (skip)
// - PAUSED → delete all paused actions, signal conversations, throw
// - NOT_REFRESHED → throw ConversationNotUpdatedException (re-enqueue)
// - BLACKLISTED → throw (skip)
// Also: if linkedIn message, checks lastMessageDbId hasn't changed
// → prevents sending duplicate/stale messages
// STEP 2: Actually process
switch (action.actionType) {
case SEND_LINKEDIN_MESSAGE:
const msg = await processSendLinkedInMessageAction(identityId, ...)
this.IDENTITY_STATE.sentLinkedInMessages.push({ id, sentAt })
return msg
case SEND_EMAIL_MESSAGE:
const result = await processSendEmailAction(conversationId, ...)
this.IDENTITY_STATE.emails.find(e).emailsSentTimestamps.push(Date.now())
return result
// SEND_CONNECTION_REQUEST, SEND_INMAIL, WHATSAPP, VISIT_PROFILE, ...
}
}
getConversationStatusAndWaitIfActive(): The safety net. Between queueing and executing, the conversation might have been paused, completed, or received a new message. This check prevents sending stale or duplicate messages — critical for avoiding extra messages the lead didn't expect.
Per-identity, per-channel controls
Decision engine: Before each action, getActionDecision() returns either PROCEED or ENQUEUE with a reason (WAIT, LINKEDIN_EXPIRED, EMAIL_EXPIRED, MISSING_LI_A, INMAIL_CREDITS_EXHAUSTED).
Every failure has a recovery strategy
LinkedIn or Email disconnected
→ Move actions to linkedIn / email-{addr} queue. Unpause when reconnected.
Connection, message, or InMail
→ Re-enqueue with waitUntil = now + 4h. Automatic backoff.
Proxy, LinkedIn 500, email provider
→ Retry up to 5-10 times with short delays (2-5s). Then escalate.
Completed, paused, or blacklisted
→ Skip action, signal conversation workflow. No retry needed.
Provider daily/hourly limit
→ Wait based on provider's rate limit response (retry-after header).
Blocked, not deliverable, bounced
→ Complete conversation. Create info message for user visibility.
How conversation and call manager talk to each other
1. Build action payload
2. addAction() to DB
3. Signal call manager
4. await result signal
5. Process result
action + signal
result signal
conversationIdentity
Action
poll actions
1. getNextAction() from DB
2. Check working hours
3. Check rate limits
4. processAction()
5. Signal result back
conversationIdentityAction
actions (ready to execute) ·
linkedIn (waiting for reconnection) ·
email-{addr} (per email address) ·
wait (rate limit cooldown) ·
inMail (credits/capability)
From signals in memory to database persistence
Three iterations to get it right
Actions sent as Temporal signals directly to the call manager. Lived in workflow memory.
Problem: Worker restart = all pending actions lost. No persistence, no recovery.
Moved action storage to Redis. Better persistence than memory, but still fragile.
Problem: No transactional guarantees. Edge cases with duplicate actions and lost state on Redis restarts.
conversationIdentityAction table with idempotency history. Durable, transactional, queryable.
Result: Actions survive any failure. Idempotency prevents duplicates. Queue semantics built-in.
05
7 scenarios. LLMs allowed. 2 min each.
🤖
Claude, ChatGPT, Gemini — whatever you prefer. Good prompting is part of the challenge.
⏰
Read the scenario, diagnose the problem, propose a fix. Then we discuss.
🏆
Based on real incidents and edge cases from this service. Context matters — generic answers won't cut it.
LinkedIn disconnects mid-campaign
A user's LinkedIn identity expires (cookie invalidated) while the call manager has 10 pending LinkedIn actions in the actions queue. The next action the call manager picks up fails with LINKEDIN_IDENTITY_EXPIRED.
actions queue
The call manager catches the LINKEDIN_IDENTITY_EXPIRED error and calls handleLinkedInExpiredIdentity(). This moves all LinkedIn actions from the actions queue to a separate linkedIn queue using pauseQueue().
Moved to linkedIn queue. Parked safely.
Stay in actions queue. Processed normally.
unpauseQueue() moves all back to actions.
Key: The queue system ensures LinkedIn issues don't block email processing. Each channel's failures are isolated to their own queue. The identityStateChangedSignal triggers the unpause when the identity comes back.
Lead replies at the worst possible moment
A conversation is executing its campaign graph. The workflow just called executeNode(LINKEDIN) which enqueued a LinkedIn message in the DB. At that exact moment, the lead replies to a previous message. The Refresh Service picks it up and signals NEW_MESSAGES to the conversation workflow, setting BREAK_FLOW = true. Meanwhile, the call manager is about to process the enqueued action.
conversationIdentityAction DB tableBREAK_FLOW is now true
The lead does not receive the unwanted message. When the call manager picks up the action, the first thing processAction() does is call getConversationStatusAndWaitIfActive(). This refreshes the conversation's messages from LinkedIn and sees the lead already replied. It returns NOT_REFRESHED, throwing ConversationNotUpdatedException — the action is re-enqueued with WAIT, not executed.
Sequence cancelled. Conversation workflow enters reply handling.
getConversationStatusAndWaitIfActive() sees the new messages and aborts.
removeAllActionsForConversation() cleans up orphaned actions after BREAK_FLOW.
Key: This is why getConversationStatusAndWaitIfActive() exists — the time between enqueuing an action and processing it is a dangerous window. The safety check uses lastConversationWorkflowMessageDbId to detect if the conversation state changed since the action was created.
Friday 5:01 PM
It's Friday at 5:01 PM. The identity's working hours are set to Monday–Friday, 9 AM–5 PM. A conversation workflow just enqueued a LinkedIn message action. The call manager picks it up.
getWaitTimeIfOOH()getWaitTimeIfOOH() return?
getWaitTimeIfOOH() uses getMillisecondsToNextWorkingHour() to calculate the exact time until the next working window. At Friday 17:01, the next working hour is Monday 9:00 AM — approximately 63 hours and 59 minutes. The call manager sleep()s for that duration.
Call manager sleeps ~64h
Queue in DB, wait in the queue until Monday
Wake up, process all queued actions
Detail: If no working hours match within 7 days (e.g., misconfigured), the function returns 6 days as a safety cap. This prevents the workflow from sleeping forever.
Trace the full flow
An identity has already sent 50 connection requests today (the daily limit). A new conversation workflow reaches a LINKEDIN_CONNECTION node and needs to send request #51.
executeNode() to the action being rate-limited. What happens at each step?
executeNode(LINKEDIN_CONNECTION) →
sendConnectionRequest() →
addAction(DB) + signal call manager →
getNextAction() →
getActionDecision() →
checks daily limit (50 ≥ 50) →
ENQUEUE(WAIT) →
re-enqueue with waitUntil = midnight →
conversationWorkflow stays blocked on condition()
...same start... →
getActionDecision() →
checks daily limit (50 < 51) + 30-min window (< 5) →
PROCEED →
processAction() →
sends request via LinkedIn API →
replySignal(success) →
conversationWorkflow unblocks →
computeNextEdge()
Note: The re-enqueued action with waitUntil = midnight will be picked up when the next day starts — but only if it's also within working hours. Otherwise, it waits until the next working window.
Activities failing on long conversations
Some conversations that have been running for weeks start failing. The error in Temporal logs shows: Network error while completing activity with a NOT_FOUND error code on respond activity task completion. The activities themselves execute successfully — the data is correct. New conversations work fine.
getConversationState
getConversationState returns the full conversation state including all messages. For long-running conversations, this can grow to hundreds of messages with full content. The serialized result exceeds the gRPC message size limit (4MB). Temporal can't deliver the oversized result and responds with NOT_FOUND.
Activity result size (long conversation)
> 4 MB
Hundreds of messages with full content
gRPC message size limit
4 MB
Default Temporal Cloud limit
Fix: Reduce the data returned by the activity — don't include full message content in the state, or paginate. Lesson: Activity return values are persisted in event history. Every byte counts for long-lived workflows. The error message (NOT_FOUND) is misleading — it's a size limit, not a missing resource.
New feature, old workflows break
A developer adds support for a new campaign node type: LINKEDIN_VIDEO_MESSAGE. They add the new case to executeNode() and deploy. New campaigns with video nodes work perfectly. But the next morning, some existing workflows that have never used video nodes start failing with non-determinism errors.
case in the switch
The new case itself is harmless — unreachable code doesn't affect determinism. The problem is the new import. The developer imported a new activity or helper for the video feature. This import changed the workflow bundle — potentially re-ordering or adding function definitions that Temporal uses as markers during replay.
Only workflows that are currently being replayed (e.g., after a worker restart or cache eviction) hit the mismatch. Workflows still in sticky cache execute in-memory and don't replay — they're fine until their next replay.
Wrap the new import and behavior behind patched('add-video-node'). The import itself must be lazy (inside the patched block) or moved to an activity so it doesn't affect the bundle structure.
Lesson: In Temporal, it's not just what you execute that matters for determinism — it's what the bundle contains. Any import that changes the bundle structure can cause non-determinism for replaying workflows, even if the new code is never called.
Manual sends silently failing
A developer refactors the conversationCallManagerWorkflow. As part of the cleanup, they move the setHandler(manualConversationIdentityActionSignal, ...) registration to after the unpauseMultipleQueues() call, since the handler needs identityConversationManager which is created right before. The code looks cleaner — handler next to the object it uses. After deploy, manual sends from the UI silently fail for some call manager workflows. New workflows work fine.
setHandler for a Temporal update (not a signal)setHandler after an activity break existing workflows?
Temporal update handlers registered after a regular activity can be lost during replay. unpauseMultipleQueues() is a regular activity (not local) — it creates a replay boundary. When an existing workflow replays after a worker restart, Temporal replays all events up to the current point. If an update arrives during replay before the handler is registered (because it's now after the activity), the update is silently dropped.
On replay, the workflow re-executes code up to where it left off. unpauseMultipleQueues() is replayed from history (no actual call), but the update handler isn't registered yet at that point in the new code. Any update that arrived during that window is lost.
New workflows execute fresh — no replay. unpauseMultipleQueues() runs for real, the handler registers right after, and all subsequent updates are caught. The bug only manifests when a workflow replays past the activity boundary.
From the actual code comment: "This is done before any activity because updates are lost across activities. Temporal docs say the contrary, but the code says the opposite." — A real Temporal SDK edge case discovered the hard way. The handler must live before any regular activity to guarantee it's registered during replay. Local activities (like localGetIdentityState) don't create this boundary.
Conversation Service @ Enginy