PRD — Integrations Sprint (IME-First MCP Connectors)

Owner: Damara Carlentine Branch (active): feat/integrations-schema Last updated: 2026-04-15 Supersedes: ~/Desktop/PRD-integrations-sprint.md (historical, incomplete)


Architecture

Boots Portal ingests data from HubSpot, Google Drive, QuickBooks, and Gmail via MCP connectors. Ingested data is embedded and stored in the existing IME (Intelligence Memory Engine)client_intelligence for enriched per-client profiles, source_documents for raw provider payloads. At proposal-generation time, the Researcher agent does semantic search over the IME (fast, free) and optionally fires 1–2 live MCP calls for fresh facts. Output proposals cite which source informed which section via source_attribution.

Flow

Connect
  User clicks "Connect HubSpot"
  → OAuth via provider's MCP endpoint
  → Tokens encrypted to Supabase Vault
  → connected_sources row created (status=active, vault refs, mcp_server_url)

Ingest (initial + nightly delta)
  → ingestion_jobs row created
  → pgmq queue pushes work to worker
  → Worker pulls raw items from provider MCP
  → Raw payloads land in source_documents (embedded + normalized_text)
  → Enriched facts rolled up into client_intelligence (one row per client, UPSERT by external_id)
  → ingestion_jobs.status = completed, items_processed updated

Generate
  Researcher agent runs match_client_intelligence + match_source_documents on query embedding
  → Optionally fires 1-2 live MCP calls for fresh facts (e.g., current deal stage)
  → Strategist/Writer/Reviewer consume context
  → source_attribution rows written per knowledge unit used

UI
  /settings/integrations → connect/disconnect cards per provider
  Proposal detail → "Informed by" section listing attributions

Key architectural decisions

  1. Knowledge destination: client_intelligence (per Ace). Already has HNSW index, match_client_intelligence RPC, and worker write-back. org_brain reserved for structured org identity (services, pricing, voice) — not ingestion target.
  2. Raw staging via source_documents (per Ace). Enables re-embed-on-model-change, audit, replay without re-hitting provider rate limits.
  3. source_attribution supports two targets via nullable FKs to client_intelligence and org_brain; exactly one non-null per row (CHECK constraint).
  4. Work queue via pgmq (not custom polling). ingestion_jobs table remains as history/audit; pgmq.ingestion_queue drives the worker.
  5. Scheduled delta syncs via pg_cron (not Vercel cron). Per-org schedules registered on connect.
  6. No wrappers for the 4 providers — verified only boots_analytics_fdw (iceberg) exists. All 4 connectors use MCP.

Status Tracker

# Step Status Notes
1 Schema foundation ✅ Done 20260414212813_integrations_schema_foundation.sql (cd89579)
2 MCP + Vault columns ✅ Done 20260415135345_integrations_mcp_and_vault.sql (5d174ec)
3 Source documents + IME link ⬜ Next Migration below. Adds source_documents, re-points source_attribution, pgmq queue, drops duplicate HNSW index
4 Vault helper lib lib/integrations/vault.ts — store/read token via Supabase Vault
5 OAuth init route POST /api/integrations/[provider]/connect
6 OAuth callback GET /api/integrations/[provider]/callback
7 Ingestion worker pgmq consumer in worker/ — provider-specific pull logic, writes to source_documents + rolls up into client_intelligence
8 Settings UI /settings/integrations — connect/disconnect cards
9 Agent DAG wiring Researcher queries IME + selective live MCP; attribution written post-run
10 End-to-end smoke test Pick HubSpot. Document in docs/runbooks/integrations-smoke-test.md

Step 3 — Source Documents + IME Link (this PRD's next action)

Migration file: supabase/migrations/20260415155135_integrations_source_documents_and_ime_link.sql

Changes (additive except where noted):

  • Create public.source_documents (raw provider payload, embedded, org-scoped, RLS)
  • Extend public.client_intelligence with source_id, external_id, source_document_id
  • Extend public.source_attribution: add nullable client_intelligence_id; make org_brain_id nullable; drop old UNIQUE; add new UNIQUE with NULLS NOT DISTINCT; add CHECK (exactly one target)
  • HNSW index on source_documents.embedding
  • public.match_source_documents(query_embedding, match_count, filter_org_id, filter_source_id, min_similarity) RPC
  • Drop duplicate index idx_client_intelligence_embedding (keep client_intelligence_embedding_hnsw_cosine)
  • Create pgmq queue ingestion_queue

Deferred (not in this migration):

  • Drop org_brain.embedding column (unused, no index) — separate cleanup sprint, requires grep confirmation no code reads it
  • pg_cron schedule registrations — runtime config per-org, not schema

Step 4 — Vault Helpers

Goal: Server-only TypeScript helpers for OAuth token storage.

Acceptance:

  • lib/integrations/vault.ts exports storeTokenInVault(token) → Promise<uuid> and readTokenFromVault(vaultId) → Promise<string>
  • Uses SUPABASE_SERVICE_ROLE_KEY; never imported into client code
  • Unit tests mock vault.create_secret / vault.decrypted_secrets; real round-trip covered in Step 10 smoke
  • Gate passes

Step 5 — OAuth Init Route

Goal: One dynamic route that starts OAuth for any supported provider.

Acceptance:

  • POST /api/integrations/[provider]/connect
  • Validates provider against enum (hubspot, google_drive, quickbooks, gmail)
  • Inserts connected_sources row with status=pending, connected_by=auth.uid()
  • Returns authorization URL
  • Provider config map at lib/integrations/providers.ts (client IDs, scopes, MCP endpoints, auth URLs)
  • RLS-aware (authenticated client, not service role)
  • Gate passes

Step 6 — OAuth Callback

Goal: Exchange code → tokens → Vault → mark active.

Acceptance:

  • GET /api/integrations/[provider]/callback?code=&state=
  • CSRF: state validated against pending connected_sources row
  • Exchanges code for access + refresh tokens
  • Writes both to Vault, stores vault IDs
  • Sets mcp_server_url, scopes, token_expires_at, status=active
  • Enqueues initial full_sync job: insert ingestion_jobs row + pgmq.send('ingestion_queue', {...})
  • Redirects to /settings/integrations?connected=<provider>
  • Error path: status=error, status_message set, redirect with error flag
  • Gate passes

Step 7 — Ingestion Worker

Goal: Background worker that consumes pgmq.ingestion_queue and writes to IME.

Acceptance:

  • Worker polls queue via pgmq.read / pgmq.delete
  • Per provider: pulls raw items, writes to source_documents (raw_payload, normalized_text, embedding)
  • Rolls up enriched per-client facts into client_intelligence (UPSERT on org_id + client_name; populate source_document_id, external_id)
  • Updates ingestion_jobs progress counters; sets status=completed on finish, failed on unrecoverable error
  • Token refresh: if token_expires_at past, refresh via provider endpoint, write new tokens to Vault, update token_expires_at
  • Gate passes (worker has its own test suite in worker/__tests__/)

⚠️ Hidden dependency: Existing worker uses a custom jobs table (claim_next_job RPC). The ingestion worker uses pgmq. Two systems will coexist short-term. Don't unify them in this sprint.


Step 8 — Settings UI

Goal: Connect/disconnect cards per provider.

Acceptance:

  • /settings/integrations route
  • Card per provider: name, status badge, last synced, item count from ingestion_jobs, Connect/Disconnect button
  • Connect hits Step 5's route, redirects to OAuth URL
  • Disconnect: status=revoked, nullify vault IDs (vault rows will be garbage-collected by ON DELETE SET NULL cascade when FKs null)
  • Brand: white bg, navy (#0D1521), gold (#F7B844)
  • Gate passes

Step 9 — Agent DAG Wiring

Goal: Researcher pulls from IME + selective live MCP; attribution written post-generation.

Acceptance:

  • In boots-brain/agents/researcher.py (or wherever Researcher lives): query match_client_intelligence + match_source_documents on proposal intent embedding
  • For rows with freshness_threshold_exceeded or queries that need live data (e.g., current deal stage), fire direct MCP call via the connected_sources.mcp_server_url endpoint using token from Vault
  • Pass retrieved context to Strategist/Writer/Reviewer as before
  • After generation, parse which IME rows the Writer's output referenced; write source_attribution row per reference with agent_role='writer', section_name, relevance_score from match
  • No regression when org has zero connections
  • Gate passes (Python tests in boots-brain/tests/)

Step 10 — End-to-End Smoke Test

Goal: Full loop proven in production with HubSpot.

Acceptance:

  • docs/runbooks/integrations-smoke-test.md documents manual test script
  • Covers: connect → OAuth callback → tokens in Vault → pgmq job runs → source_documents populated → client_intelligence enriched → generate proposal → source_attribution rows written → Sources UI shows attributions
  • Run against prod (gqqzaphchpedaczatonq)
  • Failures filed as follow-up issues
  • Gate passes

Flagged Items

  1. match_client_intelligence RPC is DB-only — no migration file defines it (was applied via dashboard). Not blocking Step 3 but should be captured in a migration file for reproducibility before sprint end.
  2. Duplicate HNSW index idx_client_intelligence_embedding + client_intelligence_embedding_hnsw_cosine both on the same column. Dropped in Step 3 migration.
  3. org_brain.embedding column is dead (no index, no readers). Leaving in place this sprint; drop in a follow-up cleanup migration after grep-confirming zero code readers.
  4. OAuth credentials prerequisiteHUBSPOT_CLIENT_ID/SECRET, GOOGLE_CLIENT_ID/SECRET, QUICKBOOKS_CLIENT_ID/SECRET, GMAIL_CLIENT_ID/SECRET (+ redirect URIs) must exist in Vercel env before Step 5. Provider OAuth apps need to be registered.
  5. Two worker systems coexist temporarily — existing claim_next_job RPC for proposal/contract jobs; pgmq for ingestion. Do not unify in this sprint.
  6. LinkedIn angle (non-technical): "First proposal platform consuming HubSpot's official MCP server." Park for launch week content.

Workflow (every step)

  1. Work branch accumulates on feat/integrations-schema until Step 10 done (per Damara's "one PR at end" directive)
  2. Migrations: Damara applies via Supabase dashboard, then I commit the .sql file to the branch as audit trail
  3. Gate before each step's commit: npm run lint && npm run typecheck && npm test && npm run build
  4. One step = one commit (or small cluster of related commits)
  5. Final merge to main is one bundled PR after all 10 steps green

Ready to create AI-powered proposals?

Start Free