PRD — Step 7: Ingestion Worker

Owner: Damara Carlentine Branch: feat/integrations-schema Last updated: 2026-04-15 Parent: integrations-sprint.md Decisions locked: Q1 same Fly app two loops; Q2 swappable fixture layer per provider.


Goal

Stand up a pgmq consumer in the existing boots-proposal-worker Fly app that pulls items from HubSpot, Google Drive, QuickBooks, and Gmail via their MCP endpoints, lands raw payloads in source_documents, and rolls enriched per-client facts into client_intelligence. Co-exists with the legacy claim_next_job proposal/contract loop in the same process.


1. Embedding model + plumbing

Decision: call boots-brain for embeddings. Single AI vendor surface, single rate-limit budget, embedding model swappable centrally without touching worker code. The brain already holds the ANTHROPIC key and is a Fly app the worker can reach internally.

  • Endpoint to add (boots-brain, Step 7 internal scope): POST /internal/embed{ texts: string[] } returns { embeddings: number[][], model: string, dim: 1536 }. Auth via BRAIN_API_SECRET shared header.
  • Target dimension: vector(1536) — confirmed in source_documents.embedding and client_intelligence.embedding. Brain must use a 1536-dim model (e.g., OpenAI text-embedding-3-small or Voyage voyage-3-lite). Brain owns provider choice; worker doesn't care.
  • Worker env vars (additions to CLAUDE.md required): BOOTS_BRAIN_URL, BRAIN_API_SECRET. Both already exist in Vercel; mirror to Fly via fly secrets set -a boots-proposal-worker.
  • Embedding provenance migration: Add two columns to source_documents so we can detect stale embeddings cheaply on re-sync:
    ALTER TABLE public.source_documents
      ADD COLUMN embedding_model       text,
      ADD COLUMN embedding_text_sha256 bytea;
    CREATE INDEX source_documents_embedding_text_sha256_idx
      ON public.source_documents (embedding_text_sha256)
      WHERE embedding_text_sha256 IS NOT NULL;
    
    Same two columns on client_intelligence for symmetry.
  • Batching: worker batches texts in groups of 32 per brain call. Configurable via EMBEDDING_BATCH_SIZE env (default 32).

2. Token refresh contract

When: on-claim only (not also on-401). Rationale: MCP servers may not surface 401 cleanly across providers; a single refresh path is easier to reason about than two. If we hit 401 mid-stream we mark the message visible-again with the default vt and let the next claim cycle refresh.

  • On-claim sequence:
    1. Worker reads pgmq message → loads connected_sources row.
    2. If token_expires_at is NULL or within TOKEN_REFRESH_SKEW_SECONDS of now() (default 60s), refresh.
    3. Refresh acquires advisory lock pg_try_advisory_xact_lock(hashtext('refresh:' || source_id)) to serialize concurrent refreshes for the same source.
    4. Optimistic-concurrency update guarded by WHERE token_expires_at = $previous_value so two workers can't both write fresh tokens.
  • Failure modes:
    • Refresh itself returns 401 / invalid_grant: mark connected_sources.status = 'error', set status_message = 'refresh_token_invalid: <provider error>', do NOT requeue, archive the pgmq message (move to pgmq.a_ingestion_queue). Surfaces as actionable in /settings/integrations: user must reconnect.
    • Refresh returns 5xx / network: keep status = 'active', do NOT update vault, increment ingestion_jobs.items_failed, leave message visible-again (no pgmq.delete), let vt expire so it retries with backoff. Max 5 redeliveries (see §4) then dead-letter.
  • Token write order (matters for crash recovery): new tokens written to vault FIRST (new vault rows), THEN connected_sources.access_token_vault_id / refresh_token_vault_id swung over in one UPDATE, THEN old vault rows soft-orphaned (we leave them; cleanup is a separate sprint). Crash between write and swing = orphan vault row, no data loss.

3. Idempotency rules

Confirmed against migration 20260415155135_integrations_source_documents_and_ime_link.sql:

  • source_documents UPSERT key: UNIQUE (source_id, external_id) — line 58–59. Worker uses INSERT ... ON CONFLICT (source_id, external_id) DO UPDATE.
  • client_intelligence UPSERT key: UNIQUE (source_id, external_id) WHERE source_id IS NOT NULL AND external_id IS NOT NULL — line 93–95. Worker uses INSERT ... ON CONFLICT (source_id, external_id) WHERE source_id IS NOT NULL AND external_id IS NOT NULL DO UPDATE. Manually-curated rows (source_id NULL) are untouched.

Skip-re-embed rule:

new_sha = sha256(normalized_text)
if existing.embedding_text_sha256 = new_sha
   AND existing.embedding_model = current_brain_model:
     UPDATE raw_payload, source_updated_at, updated_at
     SKIP embedding call entirely
else:
     call brain /internal/embed, write embedding + sha + model

This is the embedding-cost-runaway guard from §6.

Partial-write crash recovery:

  • Each (raw upsert + embedding + intelligence rollup) runs in a single SQL transaction per item. Brain embedding call happens BEFORE the transaction opens — embeddings are computed, then a transaction writes raw+embedding+intelligence atomically. Crash between embed and write = wasted embedding call, no DB inconsistency.
  • pgmq message is pgmq.delete'd only AFTER the transaction commits. Crash between commit and delete = at-most-one redelivery; UPSERT keys make it idempotent.

4. Retry / visibility-timeout policy

  • pgmq.read(queue, vt, qty) parameters:
    • vt = 300 seconds (5 min). Rationale: largest per-item budget is HubSpot full sync paginated batch (~contacts page of 100 + embed batch of 32). 300s gives 4–6× safety margin without leaving failures stuck for 30+ min.
    • qty = 1. Single-message claim keeps the rollup logic simple and lets us run multiple worker concurrency slots without claim contention beyond the advisory lock.
  • Max delivery count: 5. Tracked via pgmq.read_ct (pgmq increments per re-read). Worker checks read_ct >= 5 on claim → calls pgmq.archive(queue, msg_id) and writes ingestion_jobs.status = 'failed' with error_message = 'max_retries_exceeded'.
  • Dead-letter destination: pgmq's built-in archive table (pgmq.a_ingestion_queue, auto-created by pgmq.create). Recommended over a custom DLQ table because (a) it's free and already exists, (b) preserves message + read_ct + enqueued_at, (c) we can replay with INSERT INTO pgmq.q_ingestion_queue SELECT FROM pgmq.a_ingestion_queue WHERE .... Custom DLQ would duplicate this for no gain.
  • No exponential backoff in v1. vt is fixed at 300s. If a provider is rate-limiting us we'll see it as repeated 429s in ingestion_jobs.error_details; backoff is a Step 11 follow-up if it bites.

5. Per-provider field map

Provider MCP method (pull) Fields pulled source_documents.document_type client_intelligence rollup
hubspot hubspot.crm.contacts.list (paginated) id, properties.{firstname, lastname, email, company, jobtitle, lifecyclestage, lastmodifieddate} hubspot.contact client_name = properties.company, industry, key_facts.contact_role = jobtitle, key_facts.lifecycle = lifecyclestage
hubspot hubspot.crm.companies.list id, properties.{name, domain, industry, numberofemployees, annualrevenue, description} hubspot.company client_name = name, industry, key_facts.{employees, revenue, domain}, summary = description
hubspot hubspot.crm.deals.list id, properties.{dealname, amount, dealstage, closedate, pipeline}, associations.companies hubspot.deal rolls up to associated company's client_intelligence: key_facts.open_deals[], key_facts.deal_stage, key_facts.deal_amount
google_drive drive.files.list + drive.files.export (gdoc → text) id, name, mimeType, modifiedTime, exported_text google_drive.doc / google_drive.sheet null — document-first, source_documents only. Drive content is too unstructured to roll up cleanly; surfaced via semantic search at generate time.
quickbooks quickbooks.customers.list Id, DisplayName, CompanyName, PrimaryEmailAddr, Balance, BillAddr quickbooks.customer client_name = CompanyName ?? DisplayName, key_facts.{ar_balance: Balance, billing_address}
quickbooks quickbooks.invoices.list Id, CustomerRef, TotalAmt, Balance, DueDate, TxnDate, DocNumber quickbooks.invoice rolls up to customer: key_facts.invoice_history[] (last 5), key_facts.total_lifetime_value (sum)
gmail gmail.threads.list + gmail.threads.get id, snippet, messages[].{from, to, subject, internalDate, body_text} gmail.thread null — document-first, source_documents only. Threads enrich proposals via semantic search; rolling up email content into structured client facts is out of scope for v1.

Explicit: Drive and Gmail intentionally write zero client_intelligence rows. They populate source_documents only. The Researcher agent (Step 9) does semantic search across BOTH tables, so document-only providers still inform proposals.


6. New migrations needed

Three migrations. Sketches below — production-ready SQL is Step 7's first commit, not this PRD.

6a. SECURITY DEFINER wrappers for pgmq read/delete/archive

supabase/migrations/<ts>_integrations_pgmq_consumer_wrappers.sql

-- Read N messages, claim with vt seconds visibility timeout.
CREATE OR REPLACE FUNCTION public.ingestion_read(
  queue_name text,
  vt         integer DEFAULT 300,
  qty        integer DEFAULT 1
)
RETURNS TABLE (msg_id bigint, read_ct integer, enqueued_at timestamptz, vt timestamptz, message jsonb)
LANGUAGE plpgsql SECURITY DEFINER SET search_path = public, pgmq
AS $$
BEGIN
  RETURN QUERY SELECT * FROM pgmq.read(queue_name, vt, qty);
END;
$$;
GRANT EXECUTE ON FUNCTION public.ingestion_read(text, integer, integer) TO service_role;

-- Delete after successful processing.
CREATE OR REPLACE FUNCTION public.ingestion_delete(queue_name text, msg_id bigint)
RETURNS boolean
LANGUAGE plpgsql SECURITY DEFINER SET search_path = public, pgmq
AS $$ BEGIN RETURN pgmq.delete(queue_name, msg_id); END; $$;
GRANT EXECUTE ON FUNCTION public.ingestion_delete(text, bigint) TO service_role;

-- Archive after max-retries-exceeded.
CREATE OR REPLACE FUNCTION public.ingestion_archive(queue_name text, msg_id bigint)
RETURNS boolean
LANGUAGE plpgsql SECURITY DEFINER SET search_path = public, pgmq
AS $$ BEGIN RETURN pgmq.archive(queue_name, msg_id); END; $$;
GRANT EXECUTE ON FUNCTION public.ingestion_archive(text, bigint) TO service_role;

All three follow the same REVOKE-from-anon/authenticated, GRANT-to-service_role pattern as ingestion_enqueue in 20260415220000_integrations_pgmq_enqueue_wrapper.sql.

6b. Embedding provenance columns

supabase/migrations/<ts>_integrations_embedding_provenance.sql

ALTER TABLE public.source_documents
  ADD COLUMN embedding_model       text,
  ADD COLUMN embedding_text_sha256 bytea;
ALTER TABLE public.client_intelligence
  ADD COLUMN embedding_model       text,
  ADD COLUMN embedding_text_sha256 bytea;
CREATE INDEX source_documents_embedding_text_sha256_idx
  ON public.source_documents (embedding_text_sha256)
  WHERE embedding_text_sha256 IS NOT NULL;

6c. Token-refresh advisory-lock helper (optional, can inline)

If we keep refresh logic in TS we can call pg_try_advisory_xact_lock directly. No migration needed unless we want a named wrapper for clarity. Recommend: skip the wrapper, inline the lock in lib/integrations/token-refresh.ts.


7. Fixture layer design

Q2 locked: swappable per-provider fixtures behind a shared interface. The worker's provider clients all implement McpClient. A useFixtures flag (worker env: INTEGRATIONS_USE_FIXTURES=true|false, default false) selects implementation at boot. Fixtures are TS modules that return canned MCP responses from JSON files in worker/__fixtures__/<provider>/. This lets Jerry write Vitest tests that exercise the full ingest → embed → upsert path without an MCP server, and lets Damara smoke a connect flow against canned data before pointing at real HubSpot.

// worker/src/integrations/types.ts
export interface McpClient {
  readonly provider: 'hubspot' | 'google_drive' | 'quickbooks' | 'gmail';
  list(method: string, params?: Record<string, unknown>): AsyncIterable<RawItem>;
  get(method: string, id: string): Promise<RawItem>;
}

export interface RawItem {
  externalId: string;
  documentType: string;
  payload: unknown;
  sourceCreatedAt?: Date;
  sourceUpdatedAt?: Date;
}

// worker/src/integrations/factory.ts
export function makeMcpClient(
  provider: McpClient['provider'],
  source: ConnectedSource,
): McpClient {
  if (config.INTEGRATIONS_USE_FIXTURES) {
    return new FixtureClient(provider); // reads worker/__fixtures__/<provider>/*.json
  }
  switch (provider) {
    case 'hubspot':      return new HubspotMcpClient(source);
    case 'google_drive': return new GoogleDriveMcpClient(source);
    case 'quickbooks':   return new QuickbooksMcpClient(source);
    case 'gmail':        return new GmailMcpClient(source);
  }
}

Fixture JSON shape mirrors the provider's MCP response verbatim — a fixture is a recorded real response, not a synthesized mock. This means the same fixture file doubles as a Step 10 smoke-test golden record.


8. Test strategy

Unit (Jerry, Step 7 scope)

Each unit test runs against INTEGRATIONS_USE_FIXTURES=true and a mocked Supabase client.

  • Embedding-cost-runaway hookembeds-only-when-text-changes.test.ts: ingest the same fixture twice, assert brain /internal/embed mock called exactly once.
  • Token-refresh-race hooktoken-refresh-serializes.test.ts: simulate two workers claiming messages for the same source with stale token; assert exactly one refresh call (advisory lock holds) and both proceed with the same fresh token.
  • pgmq-message-loss hookmessage-deleted-only-after-commit.test.ts: stub the DB transaction to throw mid-write; assert ingestion_delete was NOT called and message remains in queue for redelivery.
  • Per-provider field-map tests: one fixture file per provider, assert correct rows land in source_documents and (where applicable) client_intelligence with the §5 mapping.
  • Idempotency: ingest fixture twice, assert row count unchanged and updated_at advanced.
  • Max-retries: stub read_ct = 5, assert ingestion_archive called and ingestion_jobs.status = 'failed'.

Deferred to Step 10 smoke

  • Real OAuth round-trip with HubSpot.
  • Real MCP call latency / pagination at production volume.
  • Brain embedding cost in dollars (we'll measure on the smoke run, set an alert if > $X/1000 docs).
  • pgmq behavior under genuine concurrent worker processes (Fly machine restarts, SIGTERM mid-message).

9. File manifest

Grouped by purpose.

Worker entry (modify):

  • /Users/damaracarlentine/boots-proposals/worker/src/index.ts — add second loop alongside existing claim_next_job poll. Two setInterval-style loops in one process, shared SIGTERM handler.
  • /Users/damaracarlentine/boots-proposals/worker/src/config.ts — add BOOTS_BRAIN_URL, BRAIN_API_SECRET, EMBEDDING_BATCH_SIZE, INTEGRATIONS_USE_FIXTURES, TOKEN_REFRESH_SKEW_SECONDS.

Worker handlers (new):

  • worker/src/handlers/ingestion.ts — main consumer: read → refresh-if-stale → fetch-via-mcp → embed → upsert → delete.
  • worker/src/integrations/types.tsMcpClient, RawItem, IngestionMessage interfaces.
  • worker/src/integrations/factory.tsmakeMcpClient (real-vs-fixture switch).
  • worker/src/integrations/hubspot.tsHubspotMcpClient + field-map → rollup function.
  • worker/src/integrations/google-drive.tsGoogleDriveMcpClient (source_documents only).
  • worker/src/integrations/quickbooks.tsQuickbooksMcpClient + field-map → rollup function.
  • worker/src/integrations/gmail.tsGmailMcpClient (source_documents only).
  • worker/src/integrations/fixture-client.tsFixtureClient reading from __fixtures__/.
  • worker/src/integrations/embeddings.tsembedBatch(texts: string[]) calls brain, batches at EMBEDDING_BATCH_SIZE.
  • worker/src/integrations/token-refresh.ts — advisory-lock + optimistic-concurrency refresh, returns fresh access token.

Fixtures (new):

  • worker/__fixtures__/hubspot/contacts.json, companies.json, deals.json
  • worker/__fixtures__/google_drive/files.json, gdoc-export-sample.txt
  • worker/__fixtures__/quickbooks/customers.json, invoices.json
  • worker/__fixtures__/gmail/threads.json

Migrations (new — list, do not write yet):

  • supabase/migrations/<ts>_integrations_pgmq_consumer_wrappers.sql (§6a)
  • supabase/migrations/<ts>_integrations_embedding_provenance.sql (§6b)

Boots-brain (new endpoint):

  • boots-brain/app/routes/internal_embed.py (or wherever routes live) — POST /internal/embed.

Tests (new):

  • worker/__tests__/integrations/embeds-only-when-text-changes.test.ts
  • worker/__tests__/integrations/token-refresh-serializes.test.ts
  • worker/__tests__/integrations/message-deleted-only-after-commit.test.ts
  • worker/__tests__/integrations/hubspot.test.ts
  • worker/__tests__/integrations/quickbooks.test.ts
  • worker/__tests__/integrations/google-drive.test.ts
  • worker/__tests__/integrations/gmail.test.ts
  • worker/__tests__/integrations/idempotency.test.ts
  • worker/__tests__/integrations/max-retries.test.ts

Docs (modify):

  • /Users/damaracarlentine/boots-proposals/CLAUDE.md — under "Required Fly.io env vars → boots-proposal-worker", add: BOOTS_BRAIN_URL, BRAIN_API_SECRET, EMBEDDING_BATCH_SIZE (optional), INTEGRATIONS_USE_FIXTURES (optional), TOKEN_REFRESH_SKEW_SECONDS (optional).
  • content/docs/prds/integrations-sprint.md — flip Step 7 to "🟦 In Progress" when work begins.

Damara decisions needed

  1. Brain /internal/embed model choice. OpenAI text-embedding-3-small (1536-dim, ~$0.02/1M tokens) vs Voyage voyage-3-lite (1024-dim — would force vector(1024) migration, defer) vs OpenAI text-embedding-3-large truncated to 1536 (better recall, ~$0.13/1M). Recommend: text-embedding-3-small for v1; revisit at Step 10.
  2. Fly machine sizing for worker. Adding a second poll loop + brain HTTP roundtrips per item. Current sizing was set for proposal-only load. Recommend: keep current sizing for Step 7, measure at Step 10, scale before Step 8 ships UI that lets users connect at will.
  3. Default sync cadence. Migration creates ingestion_queue but no scheduler. Step 7 only handles initial full_sync enqueued from OAuth callback. Nightly delta enqueue (pg_cron job) — ship in Step 7 or punt to Step 9? Recommend: punt to Step 9 so Step 7 stays consumer-only; document this gap.
  4. Gmail/Drive scope cap. Initial full sync of a Gmail account could be 100k+ threads = thousands of dollars in embeddings. Recommend a hard cap for v1: GMAIL_INITIAL_SYNC_MAX_THREADS=500, DRIVE_INITIAL_SYNC_MAX_FILES=200. Confirm caps or set your own.
  5. Status-message PII. When refresh fails with provider error text, do we store the raw provider message in connected_sources.status_message? Could include user email. Recommend: store provider error code only, not free-text body. Confirm.

Ready to create AI-powered proposals?

Start Free