GitHub user mlevkov edited a comment on the discussion: Proposal: HTTP Source Connector (Webhook Gateway)
# Proposal: HTTP Source Connector (Webhook Gateway) ## Summary An HTTP source connector that embeds an HTTP server inside the Source plugin, accepts incoming webhook POST requests, and produces messages to Iggy topics. This is the inverse of the HTTP sink connector ([#2925](https://github.com/apache/iggy/pull/2925)) and completes the HTTP integration pair. **Key features**: - Embedded axum HTTP server with configurable bind address and port - Two routing modes: named topics (`POST /topics/{name}`) and ephemeral endpoints (`POST /e/{random_id}`) - Event-sourced endpoint registry via a dedicated Iggy config topic — enables hot reload, multi-instance coordination, and endpoint revocation without restart - Lock-free hot path: `ArcSwap` for registry reads, `crossbeam::queue::ArrayQueue` for the message buffer, no mutex between TCP accept and HTTP 200 - HMAC signature validation (SHA-256, SHA-1) and bearer token auth, configurable per endpoint - Bounded buffer with HTTP 429 backpressure when full - Structured concurrency via `CancellationToken` tree for clean shutdown --- ## Motivation ### Gap in the Connector Ecosystem All three existing source connectors are poll-based — they reach out to external systems: | Source | Pattern | |---|---| | Random | Generate internally | | PostgreSQL | Outbound query / CDC | | Elasticsearch | Outbound scroll query | **Missing**: A push-based source that *accepts* incoming HTTP requests. Webhook ingestion is the dominant pattern for receiving real-time events — SaaS integrations (GitHub, Stripe, Slack), IoT devices, CI/CD callbacks, inter-service events. Without this, every Iggy user needing webhook → topic routing must build a standalone HTTP server + Iggy producer, duplicating connection management, error handling, backpressure, and graceful shutdown logic. --- ## Architecture: Push-to-Pull Bridge The Source trait is inherently pull-based (`poll()` returns batches). The HTTP source bridges push and pull: ``` ┌─────────────────────────────────────────────────────────────────────────┐ │ HTTP Source Connector │ │ │ │ ┌─────────────┐ ┌──────────────────┐ ┌───────────────────────┐ │ │ │ Config │ │ Endpoint │ │ HTTP Server │ │ │ │ Consumer │───►│ Registry │◄───│ (axum) │ │ │ │ (Iggy topic) │ │ (ArcSwap) │ │ │ │ │ └─────────────┘ │ lock-free reads │ │ POST /e/{endpoint_id} │ │ │ └──────────────────┘ │ POST /topics/{topic} │ │ │ │ GET /health │ │ │ └──────────┬────────────┘ │ │ │ │ │ ┌──────────────────┐ │ │ │ │ Bounded Queue │◄───────────────┘ │ │ │ (crossbeam │ │ │ │ ArrayQueue) │ │ │ └────────┬─────────┘ │ │ │ │ │ ┌────────▼─────────┐ │ │ │ poll() │──────► Iggy Producer │ │ │ drains queue │ ──► topic │ │ └──────────────────┘ │ │ │ │ All tasks coordinated via CancellationToken tree │ └─────────────────────────────────────────────────────────────────────────┘ ``` ### Three Concurrent Tasks 1. **HTTP Server** — axum server on configured port. Validates requests against the endpoint registry, enqueues valid payloads, responds immediately. 2. **Config Consumer** — Independent `IggyClient` consuming a config topic. On startup, replays from offset 0 to build the endpoint registry. Then subscribes for live updates. 3. **poll() loop** — SDK-driven. Drains the bounded queue and returns `ProducedMessages` to the runtime's forwarding loop. --- ## SDK Compatibility Analysis The Source SDK was designed for poll-based sources but is **fully compatible** with a server-style source: | Requirement | SDK Support | How | |---|---|---| | Background HTTP server | Plugin has own tokio `Runtime` (`sdk/src/lib.rs:59-63`) | `tokio::spawn` axum server in `open()` | | Background Iggy consumer | Same plugin runtime | Spawn consumer task with own `IggyClient` | | `poll()` takes `&self` | Source is `Arc`-wrapped | `Arc<SharedState>` with bounded channel | | Graceful shutdown | `watch::channel` signals poll exit | `CancellationToken` tree propagates to all tasks | | `close()` needs sole Arc ownership | `Arc::try_unwrap` at `sdk/src/source.rs:130` | Background tasks hold `Arc<SharedState>`, NOT `Arc<Source>` | | Restart | `stop` → `start` in manager | `close()` releases port; `open()` re-binds | **Critical constraint**: Background tasks must not clone the `Arc<Source>`. They hold `Arc<SharedState>` instead, which is dropped before `close()` returns. --- ## Endpoint Registry (Event-Sourced) ### Why Not Just Static Config? Static `/topics/{name}` paths are predictable and not individually revocable. For production webhook gateways: - Endpoints get compromised and need immediate revocation - Auth secrets need rotation without downtime - Multiple connector instances need consistent endpoint state - New integrations need endpoints created without restarting the connector ### Design The connector maintains a live endpoint registry backed by a dedicated Iggy config topic. On startup, it replays the topic from offset 0 to reconstruct state. Then subscribes for live updates. **Endpoint types**: 1. **Named topics** (`POST /topics/{name}`) — derived from `[[streams]]` config, always active 2. **Ephemeral endpoints** (`POST /e/{endpoint_id}`) — dynamically registered via config topic Ephemeral endpoints use a cryptographically random 32-char hex ID. The URL itself acts as a bearer token (128 bits of entropy, same as Fluvio's webhook gateway, Slack webhook URLs). Each endpoint maps to a target topic with optional per-endpoint HMAC auth. **Config topic events**: ```json {"action": "register", "endpoint_id": "a3f8c2e1...", "topic": "github_events", "auth_type": "hmac-sha256", "hmac_header": "X-Hub-Signature-256", ...} {"action": "revoke", "endpoint_id": "a3f8c2e1...", "reason": "compromised"} {"action": "update", "endpoint_id": "a3f8c2e1...", "auth_secret": "new_secret"} ``` **Graceful fallback**: If the config topic doesn't exist, the connector operates in static-only mode (named topics from TOML, global bearer token auth). The config topic is opt-in. ### Multi-Instance Coordination All instances consume the same config topic. Since events are ordered, all instances converge to the same registry state: ``` Config Topic (Iggy) │ │ │ ┌────────┘ │ └────────┐ ▼ ▼ ▼ Instance A Instance B Instance C {X, Y, Z} {X, Y, Z} {X, Y, Z} ▲ ▲ ▲ Load Balancer distributes traffic ``` Consistency model: eventual (typically <100ms propagation). Acceptable for webhook endpoints — same model used by DNS and distributed caches. --- ## Lock-Free Hot Path The HTTP request handling path must be as fast as possible — target <100μs from TCP accept to HTTP 200. ``` TCP accept → HTTP parse → Route match → Registry lookup (ArcSwap::load, ~10ns) → Auth validation (constant-time) → Body size check → ArrayQueue::push (single CAS, ~50ns) → Notify::notify_one → HTTP 200 response ``` **Why these data structures**: - **`ArcSwap`** for the endpoint registry: lock-free reads (atomic pointer load), write-rare (config changes swap in a new `Arc<HashMap>`). No per-shard locks like `DashMap`. - **`crossbeam::queue::ArrayQueue`** for the buffer: lock-free push/pop (CAS-based), pre-allocated ring buffer (zero allocation after construction), bounded (natural backpressure point). - **`tokio::sync::Notify`** to wake poll() when data arrives — avoids busy-polling. No mutex anywhere between HTTP request arrival and queue enqueue. ### poll() Implementation ```rust async fn poll(&self) -> Result<ProducedMessages, Error> { loop { let mut batch = Vec::with_capacity(self.max_batch_size); while let Some(msg) = self.queue.pop() { batch.push(msg.into_produced_message()); if batch.len() >= self.max_batch_size { break; } } if !batch.is_empty() { return Ok(ProducedMessages { schema: Schema::Raw, messages: batch, state: None }); } // Wait for notification or timeout tokio::select! { _ = self.notify.notified() => continue, _ = tokio::time::sleep(Duration::from_millis(100)) => { return Ok(ProducedMessages { messages: vec![], .. }); } _ = self.cancel_token.cancelled() => { return Ok(ProducedMessages { messages: vec![], .. }); } } } } ``` --- ## Configuration ```toml type = "source" key = "http" enabled = true name = "webhook_gateway" path = "target/release/libiggy_connector_http_source" # Source connectors use StreamProducerConfig (singular `topic`). # Multi-topic routing handled internally by the connector (see Blocker 1). [[streams]] stream = "webhooks" topic = "ingest" schema = "raw" batch_length = 100 linger_time = "5ms" [plugin_config] bind_address = "0.0.0.0" port = 9090 max_body_size_bytes = 1048576 # 1MB buffer_capacity = 10000 max_batch_size = 500 # Config topic (optional — enables endpoint registry + hot reload) config_topic_stream = "platform" config_topic_name = "http-source-config" iggy_connection_string = "tcp://127.0.0.1:8090" # Global auth (for /topics/{name} endpoints) auth_bearer_token = "global-webhook-secret" # Instance identity (for multi-instance setups) instance_id = "webhook-01" # HTTP metadata forwarding as Iggy message headers include_http_metadata = true forward_headers = ["X-Request-ID", "X-Correlation-ID"] health_check_enabled = true ``` --- ## Request/Response Contract ### Named Topic ``` POST /topics/github Authorization: Bearer global-webhook-secret {"event": "push", "repository": "apache/iggy"} ``` ``` 200 OK {"status": "queued", "topic": "github", "queue_depth": 42} ``` ### Ephemeral Endpoint ``` POST /e/a3f8c2e1b9d04f7a8e6c1d2b3a4f5e6d X-Hub-Signature-256: sha256=abc123... {"event": "push", "repository": "apache/iggy"} ``` ``` 200 OK {"status": "queued", "endpoint": "a3f8c2e1...", "topic": "github_events"} ``` ### Error Responses | Status | Condition | |--------|-----------| | 400 | Body exceeds max size | | 401 | Auth failure (bearer or HMAC) | | 404 | Unknown topic or revoked endpoint | | 410 | Expired endpoint | | 429 | Buffer full (backpressure) + `Retry-After: 1` | | 503 | Not ready (startup config replay in progress) | ### Health ``` GET /health 200 OK {"status": "ok", "instance_id": "webhook-01", "buffer_used": 42, "buffer_capacity": 10000, "named_topics": ["github", "stripe"], "ephemeral_endpoints": 7, "config_offset": 234} ``` --- ## HMAC Validation Per-endpoint HMAC validation over raw request body bytes (never re-serialized JSON): - **SHA-256** — GitHub, Stripe, most modern providers - **SHA-1** — Legacy providers Using `ring::hmac::verify` (constant-time comparison). Configured per ephemeral endpoint: ```json {"action": "register", "endpoint_id": "...", "topic": "github_events", "auth_type": "hmac-sha256", "hmac_header": "X-Hub-Signature-256", "hmac_prefix": "sha256=", "auth_secret": "whsec_..."} ``` Defense in depth: URL is unguessable (128-bit) AND HMAC signature must be valid. Either can be rotated independently. --- ## Async Acknowledgment Respond before producing to Iggy: ``` HTTP POST → validate → enqueue (CAS) → HTTP 200 ~15-50μs │ poll() drains queue ───────┘ async, ~1-5ms later Iggy producer sends batched ``` Delivery is at-most-once from the caller's perspective. 200 = "queued locally," not "persisted to Iggy." Webhook senders implement retry-on-timeout, providing at-least-once from the caller's side. --- ## Comparison: HTTP Sink vs HTTP Source | Aspect | Sink (#2925) | Source (this proposal) | |--------|-------------|----------------------| | Direction | Iggy → HTTP | HTTP → Iggy | | HTTP role | Client | Server | | SDK trait | Sink (consume) | Source (poll) | | Port binding | No | Yes | | Retry | Connector retries to endpoint | Caller retries to connector | | Dynamic config | Static TOML | Event-sourced via Iggy topic | | Dependencies | reqwest, reqwest-middleware | axum, crossbeam, arc-swap, ring | --- ## Architectural Questions for the Team 1. **Own IggyClient inside the connector**: The config consumer creates its own `IggyClient` to consume the config topic — separate from the runtime's producer pipeline. No existing connector does this. Is this acceptable, or would you prefer a runtime-level mechanism for delivering dynamic config to plugins? 2. **Config topic convention**: We propose `platform/http-source-config` for the endpoint registry. Is there an established naming convention for infrastructure topics? 3. **Server-style source**: This is the first push-based source in the ecosystem. All existing sources are poll-based. Any concerns about the push-to-pull bridge pattern (HTTP server → bounded queue → poll drains)? 4. **axum**: Already used in the Iggy server and runtime API. Confirmed as the right choice for the HTTP server? 5. **Multi-stream production**: The runtime's `spawn_source_handler` creates one producer for the last `[[streams]]` entry. Should we target single-stream multi-topic, or would it be worth addressing the multi-stream limitation in the runtime as a separate PR? --- ## Scope ### Included - Embedded HTTP server (axum) - Named topic routing (`/topics/{name}`) + ephemeral endpoints (`/e/{id}`) - Event-sourced endpoint registry via Iggy config topic - Hot reload (register/revoke/update endpoints without restart) - HMAC validation (SHA-256, SHA-1) + bearer token auth - Lock-free bounded buffer with HTTP 429 backpressure - Multi-instance coordination via shared config topic - Health/readiness endpoint - HTTP metadata forwarding as Iggy message headers - Structured concurrency (CancellationToken) - Graceful shutdown within 5s ### Not Included - TLS termination (use reverse proxy) - Rate limiting per IP (LB layer) - WebSocket support (different protocol) - Batch POST (follow-up enhancement) - mTLS (use sidecar proxy) - Config topic compaction (not needed at config-event volumes) - RBAC / Authorization — deliberate non-goal (see below) --- ## RBAC: Deliberate Non-Goal We considered and explicitly rejected building RBAC (Role-Based Access Control) into the connector. The connector performs **authentication** (verify the caller is who they claim) but delegates **authorization** (is this caller allowed to perform this action) to the layers above and below it. The connector sits between two systems that already have their own access control: ``` HTTP caller → [API Gateway / LB with RBAC] → Connector → [Iggy server with user permissions] → Topic ``` Adding a third authorization layer in the middle creates configuration complexity (three places to manage permissions with no clear source of truth), consistency headaches (permissions across layers diverge), and scope creep (role definitions, permission grants, group management — an entire authorization system that would dwarf the connector). **How authorization is already handled at each layer**: | Authorization Concern | Handled By | Mechanism | |---|---|---| | Can this caller POST to this endpoint? | API Gateway / LB | IP allowlists, OAuth scopes, WAF rules | | Can this caller use this specific ephemeral endpoint? | Connector (authentication) | URL is a 128-bit secret + optional HMAC | | Can this connector produce to this topic? | Iggy server | User permissions on streams/topics | | Can this user register/revoke endpoints? | Iggy server | Write permission on config topic | **Key insight**: Ephemeral endpoints with per-endpoint auth are functionally equivalent to per-resource authorization. Each integration gets its own endpoint with its own secret. Revoking the endpoint (via config topic) is equivalent to revoking access. No role hierarchy needed. **What would change this**: If Iggy develops a first-party connector authorization framework (e.g., connector-level ACLs managed by the server), the HTTP source should integrate with it rather than maintaining its own. --- ## Estimated Size ~3600-4650 lines across `lib.rs`, `registry.rs`, `config.rs`, `auth.rs`, `types.rs`, tests, README, and integration tests. (Revised upward — the HTTP sink at simpler complexity was 3160 lines actual vs 1400 estimated.) --- ## Pre-Implementation Review Results We ran 5 specialized review agents against this design before any code was written. **25 findings (7 CRITICAL, 18 HIGH)**. 17 fixed in the design, 4 resolved with detailed implementation notes, 3 are architectural blockers requiring team input, 1 is an inherited runtime limitation. ### Architectural Blockers (Need Team Input) #### Blocker 1: SDK Single-Topic Routing `ProducedMessage` has no topic field. The runtime's `source_forwarding_loop` creates one `IggyProducer` for the last `[[streams]]` entry and sends ALL messages from `poll()` to that single topic. Multi-topic routing — the connector's primary value proposition — cannot be expressed through the current SDK contract. We see four options: | Option | Approach | Trade-off | |---|---|---| | **(a)** | SDK extension — add `topic` field to `ProducedMessage` | Cleanest, but requires SDK changes + maintainer buy-in | | **(b)** | Own `IggyClient` for production (bypass forwarding loop) | Connector already needs own client for config consumption; `poll()` returns empty to satisfy SDK. SDK transforms bypassed. | | **(c)** | Single-topic-per-instance | N instances for N topics. Simpler, but eliminates multi-topic routing as a feature. | | **(d)** | Topic-in-headers convention | Runtime produces to one topic; downstream consumer re-routes. Adds latency. | We lean toward **(b)** for initial implementation, with **(a)** as a follow-up SDK enhancement. What does the team prefer? #### Blocker 2: Shutdown Data Loss The runtime's `stop_connector` calls `cleanup_sender` (removes the flume channel) BEFORE calling `iggy_source_close`. Messages drained by `poll()` during shutdown hit a dead callback — the forwarding loop has already exited. Options: - **(a)** Runtime change: defer `cleanup_sender` until after `iggy_source_close` completes (benefits all source connectors) - **(b)** Direct produce in `close()` via own `IggyClient` (if using Blocker 1 option b, this is free) Would a PR to adjust the shutdown order in `SourceManager::stop_connector` be welcome? #### Blocker 3: Config Consumer Failure If the config consumer's Iggy connection drops, the HTTP server continues with a frozen registry — revoked endpoints stay active. We've added health signaling (`config_consumer_healthy` flag, degraded health status, optional rejection of ephemeral requests when stale). Is this approach acceptable, or would the team prefer a different failure mode? ### Key Design Corrections From Review These have already been incorporated into the design above: - **TOML config**: `topics` (plural, sink format) → `topic` (singular, source format); `schema = "json"` → `"raw"` (arbitrary HTTP bodies) - **Body size enforcement**: `Content-Length` check replaced with `axum::DefaultBodyLimit` layer (prevents chunked transfer DoS) - **HMAC function**: Fixed `?` operator in `fn -> bool` (won't compile) → proper `let-else` pattern - **Notify wakeup**: Register `notified()` future BEFORE draining queue (prevents lost wakeup race) - **Endpoint state machine**: Added `EndpointState` enum (Active/Revoked) — revoked endpoints kept for audit - **Secret handling**: `String` → `Zeroizing<Vec<u8>>` for HMAC secrets; no Debug derive on config types - **Config events**: String discriminator → serde-tagged enum with per-action field validation - **Response security**: Stripped internal details (buffer capacity, topic names, queue depth) from HTTP responses; split `/health` (public) and `/admin/health` (internal) - **Config key validation**: `ConfigOverride` enum with allowlisted keys and range validation - **Effort estimate**: 2500-3100 → 3600-4650 lines ### Additional Type Definitions **QueuedMessage** (the hot-path bridge between HTTP handler and poll): ```rust struct QueuedMessage { topic: String, // Target topic, already resolved payload: Vec<u8>, // Raw HTTP body bytes http_metadata: HashMap<String, String>, // Pre-filtered forward headers received_at: std::time::Instant, // For queue-time latency metrics } ``` **EndpointId** (validated newtype, 128 bits of entropy): ```rust struct EndpointId(String); // Exactly 32 lowercase hex chars, validated at construction ``` **ConfigEvent** (serde-tagged, compile-time field validation): ```rust #[derive(Deserialize)] #[serde(tag = "action", rename_all = "snake_case")] enum ConfigEvent { Register(RegisterEndpoint), Revoke(RevokeEndpoint), Update(UpdateEndpoint), Config(ConfigOverride), // Allowlisted keys with range validation } ``` --- Looking forward to your feedback on the design, especially the architectural questions above. Happy to discuss any aspect in detail. GitHub link: https://github.com/apache/iggy/discussions/3039#discussioncomment-16336298 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
