ACDP Control Plane

Event correlation, streaming firehose, and federation IdP for ACDP

Scenario-agnostic control plane for the Agent Context Distribution Protocol (ACDP). Ingests webhook events from ACDP registries, correlates them into runs via the X-Run-Id header, persists raw events + lineage edges, and broadcasts the firehose via Server-Sent Events.

Also acts as the federation IdP: issues bearer JWTs via challenge/sign/token, introspects them per RFC 7662, and revokes them per RFC 7009. The auth surface is tenant-aware (per-tenant API keys), policy-gated, and audited via an append-only hash-chain ledger.

Architecture

              ┌──────────────────────────────────────────┐
              │              ACDP Registry               │
              └────────────────────┬─────────────────────┘
                                   │ POST /ingest/acdp (HMAC, X-Run-Id)

   ┌───────────────────────────────────────────────────────┐
   │                  ACDP Control Plane                   │
   │                                                       │
   │  Guards: Auth → Throttle → Policy → Quota             │
   │       │  (pins req.tenantId, actorDid, scopes)        │
   │       ▼                                               │
   │  IngestController → IngestService (HMAC verify,       │
   │       │              enrollment + domain-pack gate)   │
   │       ▼                                               │
   │  EventProcessorService                                │
   │     ├─ dedup (fingerprint) + persist raw event        │
   │     ├─ upsert run (X-Run-Id)                          │
   │     ├─ insert lineage edges                           │
   │     ├─ upsert agent + registry                        │
   │     ├─ publish per-run + global SSE                   │
   │     └─ fire outbound webhooks (outbox-tracked)        │
   │                                                       │
   │  /runs /events /contexts /agents /capabilities        │
   │  /registries /dashboard /webhooks /domain-packs       │
   │  /routing /auth/* /healthz /readyz /metrics /docs     │
   └───────────────────────────────────────────────────────┘

Every request crosses four guards in order — Auth → Throttle → Policy → Quota — and resolves to a tenant that scopes all reads and writes. See docs/ARCHITECTURE.md.

Quick start

docker compose up -d postgres
npm install
cp .env.example .env
npm run start:dev
# → http://localhost:3001/docs

Testing

npm test                       # unit tests (mocked deps, no DB)
npm run test:integration       # boots app + real Postgres on :5433

See docs/TESTING.md for the test harness, helpers, and how to add a new spec.

Capabilities

Beyond the core ingest → correlate → broadcast pipeline, the control plane adds auth/federation, multi-tenancy, governance, and discovery. Everything below is opt-in via env var; defaults keep single-tenant deployments behavior-identical. Full env reference: docs/CONFIGURATION.md.

Auth / federation — docs/AUTH.md

CapabilityEndpoint(s)Env varNotes
Token issuance (IdP)POST /auth/challenge, POST /auth/tokenTOKEN_ISSUANCE_ENABLED=true, JWT_SECRET (≥32 bytes) or JWT_PRIVATE_KEY_PEM, JWT_SIGNING_ALG=HS256|EdDSA, JWT_AUTHORITY, JWT_AUDIENCE, JWT_TTL_SECONDS, CHALLENGE_TTL_SECONDSChallenge → sign with Ed25519 or ECDSA-P256 → HS256 or EdDSA JWT
Token revocation (RFC 7009)POST /auth/token/revokeAdmin or self-revoke; RFC §2.2 no-oracle behavior
Token introspection (RFC 7662)POST /auth/introspectDispatches on iss so peer-issued tokens are also accepted
Cross-issuer federation(validation) + GET /.well-known/jwks.jsonTRUSTED_ISSUERS=iss|HS256|secret|audience[|scope],... or iss|EdDSA|jwks-url|audienceHS256 (shared secret) and EdDSA (remote JWKS) peers; audience required per entry
Bidirectional revocationserves GET /auth/revocations; consumes peersREVOCATION_FEEDS=issuer|url|admin_token[|poll_seconds],...Issuer-confined, durable per-issuer cursor
did:web key resolution(used by /auth/token fallback)SSRF-guarded, content-type checked, body-capped
Pinned-key directoryPOST /admin/pinned-keys/reloadCONTROL_PLANE_PINNED_KEYS=did=B64KEY[:alg][:from..until],...Admin-reloadable; default alg ed25519
Persistent auth storesAUTH_PERSISTENCE=memory|postgres, AUTH_SWEEP_INTERVAL_SECONDSpostgres required for multi-replica
Issuance audit ledger(same AUTH_PERSISTENCE)Append-only SHA-256 hash chain; tamper detection via verifyChain()

Multi-tenancy — docs/TENANCY.md

CapabilityEnv varNotes
Tenant-scoped API keysTENANT_API_KEYS=tenant-a:key1,tenant-b:key2,bareKeybareKey (no tenant: prefix) binds to default
Tenant-bound agents (JWT claim)TENANT_AGENTS=tenant-a:did:web:…Stamps the tenant claim on issued JWTs
Strict-tenant default-denyAUTH_REQUIRE_TENANT=trueRejects anything resolving only to default; spoofed X-Tenant-Id and explicit default are rejected
Repository-level isolationEvery read filters WHERE tenant_id = ?; writes stamp tenantId (composite conflict keys)

Governance / discovery — docs/POLICY.md

CapabilityEndpoint(s) / hookNotes
Policy engine@CheckPolicy(action) + PolicyGuardStatic-rules or OPA backend (POLICY_BACKEND), caching decider
Per-tenant quota@CheckQuota(action) + QuotaGuardWindowed counters (TENANT_QUOTAS); memory or Redis; 429 + Retry-After
Agent capability registryPOST /capabilities, GET /capabilities/search, GET /capabilities/by-agent/*didURN urn:acdp:cap:<verb>:<type>:<domain>; Ed25519/ECDSA-P256 signed, idempotent
Bandit routingGET /routing/statsThompson sampling over capability-matched arms; reward channel + BANDIT_EXPLORATION_FRACTION
Domain packsGET /domain-packsDOMAIN_PACKS gates ingest context_type (base RFC types always allowed)
Registry enrollmentPOST /registries/enroll, GET /registries/enrollmentsAdmin-only trust anchor; per-registry webhook secret + baseUrl; gates ingest when INGEST_REQUIRE_ENROLLMENT=true

Swagger / docs

Set SWAGGER_ENABLED=true to serve /docs in production (defaults on in dev). SWAGGER_PATH overrides the mount path.

Documentation

Start at the docs index: docs/README.md — it maps the subsystems and the ecosystem (which sibling repo owns what).

DocWhat's in it
docs/README.mdIndex, request-path model, subsystem map, ecosystem & sources of truth
docs/ARCHITECTURE.mdSystem context, module layout, the pipeline, the four-guard chain, SSE strategies, webhook outbox
docs/API.mdFull route reference with request/response shapes
docs/INGEST.mdThe webhook contract: HMAC, run correlation, event shape, enrollment, idempotency
docs/AUTH.mdAuth guard, JWT issuance, did:web, federation, bidirectional revocation
docs/TENANCY.mdTenant resolution, strict mode, reserved-default rule, isolation
docs/POLICY.mdPolicy + quota guards, static/OPA backends, TENANT_QUOTAS
docs/CONFIGURATION.mdComplete env-var reference + startup validation
docs/TESTING.mdUnit + integration test layout and how to write a new spec
docs/TROUBLESHOOTING.mdCommon errors and how to diagnose them
CLAUDE.mdProject conventions for agents working in this repo

This service is part of the ACDP ecosystem — the spec, the acdp SDK, and the registry. See docs/README.md for how they fit together.