PRD — Step 7: Ingestion Worker
Owner: Damara Carlentine
Branch: feat/integrations-schema
Last updated: 2026-04-15
Parent: integrations-sprint.md
Decisions locked: Q1 same Fly app two loops; Q2 swappable fixture layer per provider.
Goal
Stand up a pgmq consumer in the existing boots-proposal-worker Fly app that pulls items from HubSpot, Google Drive, QuickBooks, and Gmail via their MCP endpoints, lands raw payloads in source_documents, and rolls enriched per-client facts into client_intelligence. Co-exists with the legacy claim_next_job proposal/contract loop in the same process.
1. Embedding model + plumbing
Decision: call boots-brain for embeddings. Single AI vendor surface, single rate-limit budget, embedding model swappable centrally without touching worker code. The brain already holds the ANTHROPIC key and is a Fly app the worker can reach internally.
- Endpoint to add (boots-brain, Step 7 internal scope):
POST /internal/embed→{ texts: string[] }returns{ embeddings: number[][], model: string, dim: 1536 }. Auth viaBRAIN_API_SECRETshared header. - Target dimension:
vector(1536)— confirmed insource_documents.embeddingandclient_intelligence.embedding. Brain must use a 1536-dim model (e.g., OpenAItext-embedding-3-smallor Voyagevoyage-3-lite). Brain owns provider choice; worker doesn't care. - Worker env vars (additions to CLAUDE.md required):
BOOTS_BRAIN_URL,BRAIN_API_SECRET. Both already exist in Vercel; mirror to Fly viafly secrets set -a boots-proposal-worker. - Embedding provenance migration: Add two columns to
source_documentsso we can detect stale embeddings cheaply on re-sync:
Same two columns onALTER TABLE public.source_documents ADD COLUMN embedding_model text, ADD COLUMN embedding_text_sha256 bytea; CREATE INDEX source_documents_embedding_text_sha256_idx ON public.source_documents (embedding_text_sha256) WHERE embedding_text_sha256 IS NOT NULL;client_intelligencefor symmetry. - Batching: worker batches
textsin groups of 32 per brain call. Configurable viaEMBEDDING_BATCH_SIZEenv (default 32).
2. Token refresh contract
When: on-claim only (not also on-401). Rationale: MCP servers may not surface 401 cleanly across providers; a single refresh path is easier to reason about than two. If we hit 401 mid-stream we mark the message visible-again with the default vt and let the next claim cycle refresh.
- On-claim sequence:
- Worker reads pgmq message → loads
connected_sourcesrow. - If
token_expires_atisNULLor withinTOKEN_REFRESH_SKEW_SECONDSofnow()(default 60s), refresh. - Refresh acquires advisory lock
pg_try_advisory_xact_lock(hashtext('refresh:' || source_id))to serialize concurrent refreshes for the same source. - Optimistic-concurrency update guarded by
WHERE token_expires_at = $previous_valueso two workers can't both write fresh tokens.
- Worker reads pgmq message → loads
- Failure modes:
- Refresh itself returns 401 / invalid_grant: mark
connected_sources.status = 'error', setstatus_message = 'refresh_token_invalid: <provider error>', do NOT requeue, archive the pgmq message (move topgmq.a_ingestion_queue). Surfaces as actionable in /settings/integrations: user must reconnect. - Refresh returns 5xx / network: keep
status = 'active', do NOT update vault, incrementingestion_jobs.items_failed, leave message visible-again (nopgmq.delete), let vt expire so it retries with backoff. Max 5 redeliveries (see §4) then dead-letter.
- Refresh itself returns 401 / invalid_grant: mark
- Token write order (matters for crash recovery): new tokens written to vault FIRST (new vault rows), THEN
connected_sources.access_token_vault_id/refresh_token_vault_idswung over in one UPDATE, THEN old vault rows soft-orphaned (we leave them; cleanup is a separate sprint). Crash between write and swing = orphan vault row, no data loss.
3. Idempotency rules
Confirmed against migration 20260415155135_integrations_source_documents_and_ime_link.sql:
source_documentsUPSERT key:UNIQUE (source_id, external_id)— line 58–59. Worker usesINSERT ... ON CONFLICT (source_id, external_id) DO UPDATE.client_intelligenceUPSERT key:UNIQUE (source_id, external_id) WHERE source_id IS NOT NULL AND external_id IS NOT NULL— line 93–95. Worker usesINSERT ... ON CONFLICT (source_id, external_id) WHERE source_id IS NOT NULL AND external_id IS NOT NULL DO UPDATE. Manually-curated rows (source_id NULL) are untouched.
Skip-re-embed rule:
new_sha = sha256(normalized_text)
if existing.embedding_text_sha256 = new_sha
AND existing.embedding_model = current_brain_model:
UPDATE raw_payload, source_updated_at, updated_at
SKIP embedding call entirely
else:
call brain /internal/embed, write embedding + sha + model
This is the embedding-cost-runaway guard from §6.
Partial-write crash recovery:
- Each
(raw upsert + embedding + intelligence rollup)runs in a single SQL transaction per item. Brain embedding call happens BEFORE the transaction opens — embeddings are computed, then a transaction writes raw+embedding+intelligence atomically. Crash between embed and write = wasted embedding call, no DB inconsistency. - pgmq message is
pgmq.delete'd only AFTER the transaction commits. Crash between commit and delete = at-most-one redelivery; UPSERT keys make it idempotent.
4. Retry / visibility-timeout policy
pgmq.read(queue, vt, qty)parameters:vt = 300seconds (5 min). Rationale: largest per-item budget is HubSpot full sync paginated batch (~contacts page of 100 + embed batch of 32). 300s gives 4–6× safety margin without leaving failures stuck for 30+ min.qty = 1. Single-message claim keeps the rollup logic simple and lets us run multiple worker concurrency slots without claim contention beyond the advisory lock.
- Max delivery count: 5. Tracked via
pgmq.read_ct(pgmq increments per re-read). Worker checksread_ct >= 5on claim → callspgmq.archive(queue, msg_id)and writesingestion_jobs.status = 'failed'witherror_message = 'max_retries_exceeded'. - Dead-letter destination: pgmq's built-in archive table (
pgmq.a_ingestion_queue, auto-created bypgmq.create). Recommended over a custom DLQ table because (a) it's free and already exists, (b) preserves message + read_ct + enqueued_at, (c) we can replay withINSERT INTO pgmq.q_ingestion_queue SELECT FROM pgmq.a_ingestion_queue WHERE .... Custom DLQ would duplicate this for no gain. - No exponential backoff in v1. vt is fixed at 300s. If a provider is rate-limiting us we'll see it as repeated 429s in
ingestion_jobs.error_details; backoff is a Step 11 follow-up if it bites.
5. Per-provider field map
| Provider | MCP method (pull) | Fields pulled | → source_documents.document_type |
→ client_intelligence rollup |
|---|---|---|---|---|
| hubspot | hubspot.crm.contacts.list (paginated) |
id, properties.{firstname, lastname, email, company, jobtitle, lifecyclestage, lastmodifieddate} |
hubspot.contact |
client_name = properties.company, industry, key_facts.contact_role = jobtitle, key_facts.lifecycle = lifecyclestage |
| hubspot | hubspot.crm.companies.list |
id, properties.{name, domain, industry, numberofemployees, annualrevenue, description} |
hubspot.company |
client_name = name, industry, key_facts.{employees, revenue, domain}, summary = description |
| hubspot | hubspot.crm.deals.list |
id, properties.{dealname, amount, dealstage, closedate, pipeline}, associations.companies |
hubspot.deal |
rolls up to associated company's client_intelligence: key_facts.open_deals[], key_facts.deal_stage, key_facts.deal_amount |
| google_drive | drive.files.list + drive.files.export (gdoc → text) |
id, name, mimeType, modifiedTime, exported_text |
google_drive.doc / google_drive.sheet |
null — document-first, source_documents only. Drive content is too unstructured to roll up cleanly; surfaced via semantic search at generate time. |
| quickbooks | quickbooks.customers.list |
Id, DisplayName, CompanyName, PrimaryEmailAddr, Balance, BillAddr |
quickbooks.customer |
client_name = CompanyName ?? DisplayName, key_facts.{ar_balance: Balance, billing_address} |
| quickbooks | quickbooks.invoices.list |
Id, CustomerRef, TotalAmt, Balance, DueDate, TxnDate, DocNumber |
quickbooks.invoice |
rolls up to customer: key_facts.invoice_history[] (last 5), key_facts.total_lifetime_value (sum) |
| gmail | gmail.threads.list + gmail.threads.get |
id, snippet, messages[].{from, to, subject, internalDate, body_text} |
gmail.thread |
null — document-first, source_documents only. Threads enrich proposals via semantic search; rolling up email content into structured client facts is out of scope for v1. |
Explicit: Drive and Gmail intentionally write zero client_intelligence rows. They populate source_documents only. The Researcher agent (Step 9) does semantic search across BOTH tables, so document-only providers still inform proposals.
6. New migrations needed
Three migrations. Sketches below — production-ready SQL is Step 7's first commit, not this PRD.
6a. SECURITY DEFINER wrappers for pgmq read/delete/archive
supabase/migrations/<ts>_integrations_pgmq_consumer_wrappers.sql
-- Read N messages, claim with vt seconds visibility timeout.
CREATE OR REPLACE FUNCTION public.ingestion_read(
queue_name text,
vt integer DEFAULT 300,
qty integer DEFAULT 1
)
RETURNS TABLE (msg_id bigint, read_ct integer, enqueued_at timestamptz, vt timestamptz, message jsonb)
LANGUAGE plpgsql SECURITY DEFINER SET search_path = public, pgmq
AS $$
BEGIN
RETURN QUERY SELECT * FROM pgmq.read(queue_name, vt, qty);
END;
$$;
GRANT EXECUTE ON FUNCTION public.ingestion_read(text, integer, integer) TO service_role;
-- Delete after successful processing.
CREATE OR REPLACE FUNCTION public.ingestion_delete(queue_name text, msg_id bigint)
RETURNS boolean
LANGUAGE plpgsql SECURITY DEFINER SET search_path = public, pgmq
AS $$ BEGIN RETURN pgmq.delete(queue_name, msg_id); END; $$;
GRANT EXECUTE ON FUNCTION public.ingestion_delete(text, bigint) TO service_role;
-- Archive after max-retries-exceeded.
CREATE OR REPLACE FUNCTION public.ingestion_archive(queue_name text, msg_id bigint)
RETURNS boolean
LANGUAGE plpgsql SECURITY DEFINER SET search_path = public, pgmq
AS $$ BEGIN RETURN pgmq.archive(queue_name, msg_id); END; $$;
GRANT EXECUTE ON FUNCTION public.ingestion_archive(text, bigint) TO service_role;
All three follow the same REVOKE-from-anon/authenticated, GRANT-to-service_role pattern as ingestion_enqueue in 20260415220000_integrations_pgmq_enqueue_wrapper.sql.
6b. Embedding provenance columns
supabase/migrations/<ts>_integrations_embedding_provenance.sql
ALTER TABLE public.source_documents
ADD COLUMN embedding_model text,
ADD COLUMN embedding_text_sha256 bytea;
ALTER TABLE public.client_intelligence
ADD COLUMN embedding_model text,
ADD COLUMN embedding_text_sha256 bytea;
CREATE INDEX source_documents_embedding_text_sha256_idx
ON public.source_documents (embedding_text_sha256)
WHERE embedding_text_sha256 IS NOT NULL;
6c. Token-refresh advisory-lock helper (optional, can inline)
If we keep refresh logic in TS we can call pg_try_advisory_xact_lock directly. No migration needed unless we want a named wrapper for clarity. Recommend: skip the wrapper, inline the lock in lib/integrations/token-refresh.ts.
7. Fixture layer design
Q2 locked: swappable per-provider fixtures behind a shared interface. The worker's provider clients all implement McpClient. A useFixtures flag (worker env: INTEGRATIONS_USE_FIXTURES=true|false, default false) selects implementation at boot. Fixtures are TS modules that return canned MCP responses from JSON files in worker/__fixtures__/<provider>/. This lets Jerry write Vitest tests that exercise the full ingest → embed → upsert path without an MCP server, and lets Damara smoke a connect flow against canned data before pointing at real HubSpot.
// worker/src/integrations/types.ts
export interface McpClient {
readonly provider: 'hubspot' | 'google_drive' | 'quickbooks' | 'gmail';
list(method: string, params?: Record<string, unknown>): AsyncIterable<RawItem>;
get(method: string, id: string): Promise<RawItem>;
}
export interface RawItem {
externalId: string;
documentType: string;
payload: unknown;
sourceCreatedAt?: Date;
sourceUpdatedAt?: Date;
}
// worker/src/integrations/factory.ts
export function makeMcpClient(
provider: McpClient['provider'],
source: ConnectedSource,
): McpClient {
if (config.INTEGRATIONS_USE_FIXTURES) {
return new FixtureClient(provider); // reads worker/__fixtures__/<provider>/*.json
}
switch (provider) {
case 'hubspot': return new HubspotMcpClient(source);
case 'google_drive': return new GoogleDriveMcpClient(source);
case 'quickbooks': return new QuickbooksMcpClient(source);
case 'gmail': return new GmailMcpClient(source);
}
}
Fixture JSON shape mirrors the provider's MCP response verbatim — a fixture is a recorded real response, not a synthesized mock. This means the same fixture file doubles as a Step 10 smoke-test golden record.
8. Test strategy
Unit (Jerry, Step 7 scope)
Each unit test runs against INTEGRATIONS_USE_FIXTURES=true and a mocked Supabase client.
- Embedding-cost-runaway hook —
embeds-only-when-text-changes.test.ts: ingest the same fixture twice, assert brain/internal/embedmock called exactly once. - Token-refresh-race hook —
token-refresh-serializes.test.ts: simulate two workers claiming messages for the same source with stale token; assert exactly one refresh call (advisory lock holds) and both proceed with the same fresh token. - pgmq-message-loss hook —
message-deleted-only-after-commit.test.ts: stub the DB transaction to throw mid-write; assertingestion_deletewas NOT called and message remains in queue for redelivery. - Per-provider field-map tests: one fixture file per provider, assert correct rows land in
source_documentsand (where applicable)client_intelligencewith the §5 mapping. - Idempotency: ingest fixture twice, assert row count unchanged and
updated_atadvanced. - Max-retries: stub
read_ct = 5, assertingestion_archivecalled andingestion_jobs.status = 'failed'.
Deferred to Step 10 smoke
- Real OAuth round-trip with HubSpot.
- Real MCP call latency / pagination at production volume.
- Brain embedding cost in dollars (we'll measure on the smoke run, set an alert if > $X/1000 docs).
- pgmq behavior under genuine concurrent worker processes (Fly machine restarts, SIGTERM mid-message).
9. File manifest
Grouped by purpose.
Worker entry (modify):
/Users/damaracarlentine/boots-proposals/worker/src/index.ts— add second loop alongside existingclaim_next_jobpoll. TwosetInterval-style loops in one process, shared SIGTERM handler./Users/damaracarlentine/boots-proposals/worker/src/config.ts— addBOOTS_BRAIN_URL,BRAIN_API_SECRET,EMBEDDING_BATCH_SIZE,INTEGRATIONS_USE_FIXTURES,TOKEN_REFRESH_SKEW_SECONDS.
Worker handlers (new):
worker/src/handlers/ingestion.ts— main consumer: read → refresh-if-stale → fetch-via-mcp → embed → upsert → delete.worker/src/integrations/types.ts—McpClient,RawItem,IngestionMessageinterfaces.worker/src/integrations/factory.ts—makeMcpClient(real-vs-fixture switch).worker/src/integrations/hubspot.ts—HubspotMcpClient+ field-map → rollup function.worker/src/integrations/google-drive.ts—GoogleDriveMcpClient(source_documents only).worker/src/integrations/quickbooks.ts—QuickbooksMcpClient+ field-map → rollup function.worker/src/integrations/gmail.ts—GmailMcpClient(source_documents only).worker/src/integrations/fixture-client.ts—FixtureClientreading from__fixtures__/.worker/src/integrations/embeddings.ts—embedBatch(texts: string[])calls brain, batches atEMBEDDING_BATCH_SIZE.worker/src/integrations/token-refresh.ts— advisory-lock + optimistic-concurrency refresh, returns fresh access token.
Fixtures (new):
worker/__fixtures__/hubspot/contacts.json,companies.json,deals.jsonworker/__fixtures__/google_drive/files.json,gdoc-export-sample.txtworker/__fixtures__/quickbooks/customers.json,invoices.jsonworker/__fixtures__/gmail/threads.json
Migrations (new — list, do not write yet):
supabase/migrations/<ts>_integrations_pgmq_consumer_wrappers.sql(§6a)supabase/migrations/<ts>_integrations_embedding_provenance.sql(§6b)
Boots-brain (new endpoint):
boots-brain/app/routes/internal_embed.py(or wherever routes live) —POST /internal/embed.
Tests (new):
worker/__tests__/integrations/embeds-only-when-text-changes.test.tsworker/__tests__/integrations/token-refresh-serializes.test.tsworker/__tests__/integrations/message-deleted-only-after-commit.test.tsworker/__tests__/integrations/hubspot.test.tsworker/__tests__/integrations/quickbooks.test.tsworker/__tests__/integrations/google-drive.test.tsworker/__tests__/integrations/gmail.test.tsworker/__tests__/integrations/idempotency.test.tsworker/__tests__/integrations/max-retries.test.ts
Docs (modify):
/Users/damaracarlentine/boots-proposals/CLAUDE.md— under "Required Fly.io env vars → boots-proposal-worker", add:BOOTS_BRAIN_URL,BRAIN_API_SECRET,EMBEDDING_BATCH_SIZE(optional),INTEGRATIONS_USE_FIXTURES(optional),TOKEN_REFRESH_SKEW_SECONDS(optional).content/docs/prds/integrations-sprint.md— flip Step 7 to "🟦 In Progress" when work begins.
Damara decisions needed
- Brain
/internal/embedmodel choice. OpenAItext-embedding-3-small(1536-dim, ~$0.02/1M tokens) vs Voyagevoyage-3-lite(1024-dim — would forcevector(1024)migration, defer) vs OpenAItext-embedding-3-largetruncated to 1536 (better recall, ~$0.13/1M). Recommend:text-embedding-3-smallfor v1; revisit at Step 10. - Fly machine sizing for worker. Adding a second poll loop + brain HTTP roundtrips per item. Current sizing was set for proposal-only load. Recommend: keep current sizing for Step 7, measure at Step 10, scale before Step 8 ships UI that lets users connect at will.
- Default sync cadence. Migration creates
ingestion_queuebut no scheduler. Step 7 only handles initialfull_syncenqueued from OAuth callback. Nightly delta enqueue (pg_cronjob) — ship in Step 7 or punt to Step 9? Recommend: punt to Step 9 so Step 7 stays consumer-only; document this gap. - Gmail/Drive scope cap. Initial full sync of a Gmail account could be 100k+ threads = thousands of dollars in embeddings. Recommend a hard cap for v1:
GMAIL_INITIAL_SYNC_MAX_THREADS=500,DRIVE_INITIAL_SYNC_MAX_FILES=200. Confirm caps or set your own. - Status-message PII. When refresh fails with provider error text, do we store the raw provider message in
connected_sources.status_message? Could include user email. Recommend: store provider error code only, not free-text body. Confirm.
Ready to create AI-powered proposals?
Start Free