GitHub user mlevkov added a comment to the discussion: Proposal: HTTP Source Connector (Webhook Gateway)
# Proposal: HTTP Source Connector (Webhook Gateway) ## Summary An HTTP source connector that embeds an HTTP server inside the Source plugin, accepts incoming webhook POST requests, and produces messages to Iggy topics. This is the inverse of the HTTP sink connector ([#2925](https://github.com/apache/iggy/pull/2925)) and completes the HTTP integration pair. **Key features**: - Embedded axum HTTP server with configurable bind address and port - Two routing modes: named topics (`POST /topics/{name}`) and ephemeral endpoints (`POST /e/{random_id}`) - Event-sourced endpoint registry via a dedicated Iggy config topic — enables hot reload, multi-instance coordination, and endpoint revocation without restart - Lock-free hot path: `ArcSwap` for registry reads, `crossbeam::ArrayQueue` for the message buffer, no mutex between TCP accept and HTTP 200 - HMAC signature validation (SHA-256, SHA-1) and bearer token auth, configurable per endpoint - Bounded buffer with HTTP 429 backpressure when full - Structured concurrency via `CancellationToken` tree for clean shutdown --- ## Motivation ### Gap in the Connector Ecosystem All three existing source connectors are poll-based — they reach out to external systems: | Source | Pattern | |---|---| | Random | Generate internally | | PostgreSQL | Outbound query / CDC | | Elasticsearch | Outbound scroll query | **Missing**: A push-based source that *accepts* incoming HTTP requests. Webhook ingestion is the dominant pattern for receiving real-time events — SaaS integrations (GitHub, Stripe, Slack), IoT devices, CI/CD callbacks, inter-service events. Without this, every Iggy user needing webhook → topic routing must build a standalone HTTP server + Iggy producer, duplicating connection management, error handling, backpressure, and graceful shutdown logic. --- ## Architecture: Push-to-Pull Bridge The Source trait is inherently pull-based (`poll()` returns batches). The HTTP source bridges push and pull: ``` ┌─────────────────────────────────────────────────────────────────────────┐ │ HTTP Source Connector │ │ │ │ ┌─────────────┐ ┌──────────────────┐ ┌───────────────────────┐ │ │ │ Config │ │ Endpoint │ │ HTTP Server │ │ │ │ Consumer │───►│ Registry │◄───│ (axum) │ │ │ │ (Iggy topic) │ │ (ArcSwap) │ │ │ │ │ └─────────────┘ │ lock-free reads │ │ POST /e/{endpoint_id} │ │ │ └──────────────────┘ │ POST /topics/{topic} │ │ │ │ GET /health │ │ │ └──────────┬────────────┘ │ │ │ │ │ ┌──────────────────┐ │ │ │ │ Bounded Queue │◄───────────────┘ │ │ │ (crossbeam │ │ │ │ ArrayQueue) │ │ │ └────────┬─────────┘ │ │ │ │ │ ┌────────▼─────────┐ │ │ │ poll() │──────► Iggy Producer │ │ │ drains queue │ ──► topic │ │ └──────────────────┘ │ │ │ │ All tasks coordinated via CancellationToken tree │ └─────────────────────────────────────────────────────────────────────────┘ ``` ### Three Concurrent Tasks 1. **HTTP Server** — axum server on configured port. Validates requests against the endpoint registry, enqueues valid payloads, responds immediately. 2. **Config Consumer** — Independent `IggyClient` consuming a config topic. On startup, replays from offset 0 to build the endpoint registry. Then subscribes for live updates. 3. **poll() loop** — SDK-driven. Drains the bounded queue and returns `ProducedMessages` to the runtime's forwarding loop. --- ## SDK Compatibility Analysis The Source SDK was designed for poll-based sources but is **fully compatible** with a server-style source: | Requirement | SDK Support | How | |---|---|---| | Background HTTP server | Plugin has own tokio `Runtime` (`sdk/src/lib.rs:59-63`) | `tokio::spawn` axum server in `open()` | | Background Iggy consumer | Same plugin runtime | Spawn consumer task with own `IggyClient` | | `poll()` takes `&self` | Source is `Arc`-wrapped | `Arc<SharedState>` with bounded channel | | Graceful shutdown | `watch::channel` signals poll exit | `CancellationToken` tree propagates to all tasks | | `close()` needs sole Arc ownership | `Arc::try_unwrap` at `sdk/src/source.rs:130` | Background tasks hold `Arc<SharedState>`, NOT `Arc<Source>` | | Restart | `stop` → `start` in manager | `close()` releases port; `open()` re-binds | **Critical constraint**: Background tasks must not clone the `Arc<Source>`. They hold `Arc<SharedState>` instead, which is dropped before `close()` returns. --- ## Endpoint Registry (Event-Sourced) ### Why Not Just Static Config? Static `/topics/{name}` paths are predictable and not individually revocable. For production webhook gateways: - Endpoints get compromised and need immediate revocation - Auth secrets need rotation without downtime - Multiple connector instances need consistent endpoint state - New integrations need endpoints created without restarting the connector ### Design The connector maintains a live endpoint registry backed by a dedicated Iggy config topic. On startup, it replays the topic from offset 0 to reconstruct state. Then subscribes for live updates. **Endpoint types**: 1. **Named topics** (`POST /topics/{name}`) — derived from `[[streams]]` config, always active 2. **Ephemeral endpoints** (`POST /e/{endpoint_id}`) — dynamically registered via config topic Ephemeral endpoints use a cryptographically random 32-char hex ID. The URL itself acts as a bearer token (128 bits of entropy, same as Fluvio's webhook gateway, Slack webhook URLs). Each endpoint maps to a target topic with optional per-endpoint HMAC auth. **Config topic events**: ```json {"action": "register", "endpoint_id": "a3f8c2e1...", "topic": "github_events", "auth_type": "hmac-sha256", "hmac_header": "X-Hub-Signature-256", ...} {"action": "revoke", "endpoint_id": "a3f8c2e1...", "reason": "compromised"} {"action": "update", "endpoint_id": "a3f8c2e1...", "auth_secret": "new_secret"} ``` **Graceful fallback**: If the config topic doesn't exist, the connector operates in static-only mode (named topics from TOML, global bearer token auth). The config topic is opt-in. ### Multi-Instance Coordination All instances consume the same config topic. Since events are ordered, all instances converge to the same registry state: ``` Config Topic (Iggy) │ │ │ ┌────────┘ │ └────────┐ ▼ ▼ ▼ Instance A Instance B Instance C {X, Y, Z} {X, Y, Z} {X, Y, Z} ▲ ▲ ▲ Load Balancer distributes traffic ``` Consistency model: eventual (typically <100ms propagation). Acceptable for webhook endpoints — same model used by DNS and distributed caches. --- ## Lock-Free Hot Path The HTTP request handling path must be as fast as possible — target <100μs from TCP accept to HTTP 200. ``` TCP accept → HTTP parse → Route match → Registry lookup (ArcSwap::load, ~10ns) → Auth validation (constant-time) → Body size check → ArrayQueue::push (single CAS, ~50ns) → Notify::notify_one → HTTP 200 response ``` **Why these data structures**: - **`ArcSwap`** for the endpoint registry: lock-free reads (atomic pointer load), write-rare (config changes swap in a new `Arc<HashMap>`). No per-shard locks like `DashMap`. - **`crossbeam::ArrayQueue`** for the buffer: lock-free push/pop (CAS-based), pre-allocated ring buffer (zero allocation after construction), bounded (natural backpressure point). - **`tokio::sync::Notify`** to wake poll() when data arrives — avoids busy-polling. No mutex anywhere between HTTP request arrival and queue enqueue. ### poll() Implementation ```rust async fn poll(&self) -> Result<ProducedMessages, Error> { loop { let mut batch = Vec::with_capacity(self.max_batch_size); while let Some(msg) = self.queue.pop() { batch.push(msg.into_produced_message()); if batch.len() >= self.max_batch_size { break; } } if !batch.is_empty() { return Ok(ProducedMessages { schema: Schema::Raw, messages: batch, state: None }); } // Wait for notification or timeout tokio::select! { _ = self.notify.notified() => continue, _ = tokio::time::sleep(Duration::from_millis(100)) => { return Ok(ProducedMessages { messages: vec![], .. }); } _ = self.cancel_token.cancelled() => { return Ok(ProducedMessages { messages: vec![], .. }); } } } } ``` --- ## Configuration ```toml type = "source" key = "http" enabled = true name = "webhook_gateway" path = "target/release/libiggy_connector_http_source" [[streams]] stream = "webhooks" topics = ["github", "stripe", "generic"] schema = "json" batch_length = 100 linger_time = "5ms" [plugin_config] bind_address = "0.0.0.0" port = 9090 max_body_size_bytes = 1048576 # 1MB buffer_capacity = 10000 max_batch_size = 500 # Config topic (optional — enables endpoint registry + hot reload) config_topic_stream = "platform" config_topic_name = "http-source-config" iggy_connection_string = "tcp://127.0.0.1:8090" # Global auth (for /topics/{name} endpoints) auth_bearer_token = "global-webhook-secret" # Instance identity (for multi-instance setups) instance_id = "webhook-01" # HTTP metadata forwarding as Iggy message headers include_http_metadata = true forward_headers = ["X-Request-ID", "X-Correlation-ID"] health_check_enabled = true ``` --- ## Request/Response Contract ### Named Topic ``` POST /topics/github Authorization: Bearer global-webhook-secret {"event": "push", "repository": "apache/iggy"} ``` ``` 200 OK {"status": "queued", "topic": "github", "queue_depth": 42} ``` ### Ephemeral Endpoint ``` POST /e/a3f8c2e1b9d04f7a8e6c1d2b3a4f5e6d X-Hub-Signature-256: sha256=abc123... {"event": "push", "repository": "apache/iggy"} ``` ``` 200 OK {"status": "queued", "endpoint": "a3f8c2e1...", "topic": "github_events"} ``` ### Error Responses | Status | Condition | |--------|-----------| | 400 | Body exceeds max size | | 401 | Auth failure (bearer or HMAC) | | 404 | Unknown topic or revoked endpoint | | 410 | Expired endpoint | | 429 | Buffer full (backpressure) + `Retry-After: 1` | | 503 | Not ready (startup config replay in progress) | ### Health ``` GET /health 200 OK {"status": "ok", "instance_id": "webhook-01", "buffer_used": 42, "buffer_capacity": 10000, "named_topics": ["github", "stripe"], "ephemeral_endpoints": 7, "config_offset": 234} ``` --- ## HMAC Validation Per-endpoint HMAC validation over raw request body bytes (never re-serialized JSON): - **SHA-256** — GitHub, Stripe, most modern providers - **SHA-1** — Legacy providers Using `ring::hmac::verify` (constant-time comparison). Configured per ephemeral endpoint: ```json {"action": "register", "endpoint_id": "...", "topic": "github_events", "auth_type": "hmac-sha256", "hmac_header": "X-Hub-Signature-256", "hmac_prefix": "sha256=", "auth_secret": "whsec_..."} ``` Defense in depth: URL is unguessable (128-bit) AND HMAC signature must be valid. Either can be rotated independently. --- ## Async Acknowledgment Respond before producing to Iggy: ``` HTTP POST → validate → enqueue (CAS) → HTTP 200 ~15-50μs │ poll() drains queue ───────┘ async, ~1-5ms later Iggy producer sends batched ``` Delivery is at-most-once from the caller's perspective. 200 = "queued locally," not "persisted to Iggy." Webhook senders implement retry-on-timeout, providing at-least-once from the caller's side. --- ## Comparison: HTTP Sink vs HTTP Source | Aspect | Sink (#2925) | Source (this proposal) | |--------|-------------|----------------------| | Direction | Iggy → HTTP | HTTP → Iggy | | HTTP role | Client | Server | | SDK trait | Sink (consume) | Source (poll) | | Port binding | No | Yes | | Retry | Connector retries to endpoint | Caller retries to connector | | Dynamic config | Static TOML | Event-sourced via Iggy topic | | Dependencies | reqwest, reqwest-middleware | axum, crossbeam, arc-swap, ring | --- ## Architectural Questions for the Team 1. **Own IggyClient inside the connector**: The config consumer creates its own `IggyClient` to consume the config topic — separate from the runtime's producer pipeline. No existing connector does this. Is this acceptable, or would you prefer a runtime-level mechanism for delivering dynamic config to plugins? 2. **Config topic convention**: We propose `platform/http-source-config` for the endpoint registry. Is there an established naming convention for infrastructure topics? 3. **Server-style source**: This is the first push-based source in the ecosystem. All existing sources are poll-based. Any concerns about the push-to-pull bridge pattern (HTTP server → bounded queue → poll drains)? 4. **axum**: Already used in the Iggy server and runtime API. Confirmed as the right choice for the HTTP server? 5. **Multi-stream production**: The runtime's `spawn_source_handler` creates one producer for the last `[[streams]]` entry. Should we target single-stream multi-topic, or would it be worth addressing the multi-stream limitation in the runtime as a separate PR? --- ## Scope ### Included - Embedded HTTP server (axum) - Named topic routing (`/topics/{name}`) + ephemeral endpoints (`/e/{id}`) - Event-sourced endpoint registry via Iggy config topic - Hot reload (register/revoke/update endpoints without restart) - HMAC validation (SHA-256, SHA-1) + bearer token auth - Lock-free bounded buffer with HTTP 429 backpressure - Multi-instance coordination via shared config topic - Health/readiness endpoint - HTTP metadata forwarding as Iggy message headers - Structured concurrency (CancellationToken) - Graceful shutdown within 5s ### Not Included - TLS termination (use reverse proxy) - Rate limiting per IP (LB layer) - WebSocket support (different protocol) - Batch POST (follow-up enhancement) - mTLS (use sidecar proxy) - Config topic compaction (not needed at config-event volumes) --- ## Estimated Size ~2500-3100 lines across `lib.rs`, `registry.rs`, `config.rs`, tests, README, and integration tests. --- Looking forward to your feedback on the design, especially the architectural questions above. Happy to discuss any aspect in detail. GitHub link: https://github.com/apache/iggy/discussions/3039#discussioncomment-16336298 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
