GitHub user mlevkov edited a comment on the discussion: Proposal: HTTP Source 
Connector (Webhook Gateway)

# Proposal: HTTP Source Connector (Webhook Gateway)

## Summary

An HTTP source connector that embeds an HTTP server inside the Source plugin, 
accepts incoming webhook POST requests, and produces messages to Iggy topics. 
This is the inverse of the HTTP sink connector 
([#2925](https://github.com/apache/iggy/pull/2925)) and completes the HTTP 
integration pair.

**Key features**:
- Embedded axum HTTP server with configurable bind address and port
- Two routing modes: named topics (`POST /topics/{name}`) and ephemeral 
endpoints (`POST /e/{random_id}`)
- Event-sourced endpoint registry via a dedicated Iggy config topic — enables 
hot reload, multi-instance coordination, and endpoint revocation without restart
- Lock-free hot path: `ArcSwap` for registry reads, `crossbeam::ArrayQueue` for 
the message buffer, no mutex between TCP accept and HTTP 200
- HMAC signature validation (SHA-256, SHA-1) and bearer token auth, 
configurable per endpoint
- Bounded buffer with HTTP 429 backpressure when full
- Structured concurrency via `CancellationToken` tree for clean shutdown

---

## Motivation

### Gap in the Connector Ecosystem

All three existing source connectors are poll-based — they reach out to 
external systems:

| Source | Pattern |
|---|---|
| Random | Generate internally |
| PostgreSQL | Outbound query / CDC |
| Elasticsearch | Outbound scroll query |

**Missing**: A push-based source that *accepts* incoming HTTP requests. Webhook 
ingestion is the dominant pattern for receiving real-time events — SaaS 
integrations (GitHub, Stripe, Slack), IoT devices, CI/CD callbacks, 
inter-service events. Without this, every Iggy user needing webhook → topic 
routing must build a standalone HTTP server + Iggy producer, duplicating 
connection management, error handling, backpressure, and graceful shutdown 
logic.

---

## Architecture: Push-to-Pull Bridge

The Source trait is inherently pull-based (`poll()` returns batches). The HTTP 
source bridges push and pull:

```
  ┌─────────────────────────────────────────────────────────────────────────┐
  │  HTTP Source Connector                                                  │
  │                                                                         │
  │  ┌─────────────┐    ┌──────────────────┐    ┌───────────────────────┐  │
  │  │ Config       │    │ Endpoint         │    │ HTTP Server           │  │
  │  │ Consumer     │───►│ Registry         │◄───│ (axum)                │  │
  │  │ (Iggy topic) │    │ (ArcSwap)        │    │                       │  │
  │  └─────────────┘    │ lock-free reads   │    │ POST /e/{endpoint_id} │  │
  │                      └──────────────────┘    │ POST /topics/{topic}  │  │
  │                                               │ GET  /health          │  │
  │                                               └──────────┬────────────┘  │
  │                                                          │               │
  │                      ┌──────────────────┐                │               │
  │                      │ Bounded Queue    │◄───────────────┘               │
  │                      │ (crossbeam       │                                │
  │                      │  ArrayQueue)     │                                │
  │                      └────────┬─────────┘                                │
  │                               │                                          │
  │                      ┌────────▼─────────┐                                │
  │                      │ poll()           │──────►  Iggy Producer          │
  │                      │ drains queue     │         ──► topic              │
  │                      └──────────────────┘                                │
  │                                                                         │
  │  All tasks coordinated via CancellationToken tree                       │
  └─────────────────────────────────────────────────────────────────────────┘
```

### Three Concurrent Tasks

1. **HTTP Server** — axum server on configured port. Validates requests against 
the endpoint registry, enqueues valid payloads, responds immediately.
2. **Config Consumer** — Independent `IggyClient` consuming a config topic. On 
startup, replays from offset 0 to build the endpoint registry. Then subscribes 
for live updates.
3. **poll() loop** — SDK-driven. Drains the bounded queue and returns 
`ProducedMessages` to the runtime's forwarding loop.

---

## SDK Compatibility Analysis

The Source SDK was designed for poll-based sources but is **fully compatible** 
with a server-style source:

| Requirement | SDK Support | How |
|---|---|---|
| Background HTTP server | Plugin has own tokio `Runtime` 
(`sdk/src/lib.rs:59-63`) | `tokio::spawn` axum server in `open()` |
| Background Iggy consumer | Same plugin runtime | Spawn consumer task with own 
`IggyClient` |
| `poll()` takes `&self` | Source is `Arc`-wrapped | `Arc<SharedState>` with 
bounded channel |
| Graceful shutdown | `watch::channel` signals poll exit | `CancellationToken` 
tree propagates to all tasks |
| `close()` needs sole Arc ownership | `Arc::try_unwrap` at 
`sdk/src/source.rs:130` | Background tasks hold `Arc<SharedState>`, NOT 
`Arc<Source>` |
| Restart | `stop` → `start` in manager | `close()` releases port; `open()` 
re-binds |

**Critical constraint**: Background tasks must not clone the `Arc<Source>`. 
They hold `Arc<SharedState>` instead, which is dropped before `close()` returns.

---

## Endpoint Registry (Event-Sourced)

### Why Not Just Static Config?

Static `/topics/{name}` paths are predictable and not individually revocable. 
For production webhook gateways:

- Endpoints get compromised and need immediate revocation
- Auth secrets need rotation without downtime
- Multiple connector instances need consistent endpoint state
- New integrations need endpoints created without restarting the connector

### Design

The connector maintains a live endpoint registry backed by a dedicated Iggy 
config topic. On startup, it replays the topic from offset 0 to reconstruct 
state. Then subscribes for live updates.

**Endpoint types**:

1. **Named topics** (`POST /topics/{name}`) — derived from `[[streams]]` 
config, always active
2. **Ephemeral endpoints** (`POST /e/{endpoint_id}`) — dynamically registered 
via config topic

Ephemeral endpoints use a cryptographically random 32-char hex ID. The URL 
itself acts as a bearer token (128 bits of entropy, same as Fluvio's webhook 
gateway, Slack webhook URLs). Each endpoint maps to a target topic with 
optional per-endpoint HMAC auth.

**Config topic events**:

```json
{"action": "register", "endpoint_id": "a3f8c2e1...", "topic": "github_events",
 "auth_type": "hmac-sha256", "hmac_header": "X-Hub-Signature-256", ...}

{"action": "revoke", "endpoint_id": "a3f8c2e1...", "reason": "compromised"}

{"action": "update", "endpoint_id": "a3f8c2e1...", "auth_secret": "new_secret"}
```

**Graceful fallback**: If the config topic doesn't exist, the connector 
operates in static-only mode (named topics from TOML, global bearer token 
auth). The config topic is opt-in.

### Multi-Instance Coordination

All instances consume the same config topic. Since events are ordered, all 
instances converge to the same registry state:

```
              Config Topic (Iggy)
                 │    │    │
        ┌────────┘    │    └────────┐
        ▼             ▼             ▼
   Instance A    Instance B    Instance C
   {X, Y, Z}    {X, Y, Z}    {X, Y, Z}
        ▲             ▲             ▲
        Load Balancer distributes traffic
```

Consistency model: eventual (typically <100ms propagation). Acceptable for 
webhook endpoints — same model used by DNS and distributed caches.

---

## Lock-Free Hot Path

The HTTP request handling path must be as fast as possible — target <100μs from 
TCP accept to HTTP 200.

```
TCP accept → HTTP parse → Route match → Registry lookup (ArcSwap::load, ~10ns)
  → Auth validation (constant-time) → Body size check → ArrayQueue::push 
(single CAS, ~50ns)
  → Notify::notify_one → HTTP 200 response
```

**Why these data structures**:

- **`ArcSwap`** for the endpoint registry: lock-free reads (atomic pointer 
load), write-rare (config changes swap in a new `Arc<HashMap>`). No per-shard 
locks like `DashMap`.
- **`crossbeam::ArrayQueue`** for the buffer: lock-free push/pop (CAS-based), 
pre-allocated ring buffer (zero allocation after construction), bounded 
(natural backpressure point).
- **`tokio::sync::Notify`** to wake poll() when data arrives — avoids 
busy-polling.

No mutex anywhere between HTTP request arrival and queue enqueue.

### poll() Implementation

```rust
async fn poll(&self) -> Result<ProducedMessages, Error> {
    loop {
        let mut batch = Vec::with_capacity(self.max_batch_size);
        while let Some(msg) = self.queue.pop() {
            batch.push(msg.into_produced_message());
            if batch.len() >= self.max_batch_size { break; }
        }
        if !batch.is_empty() {
            return Ok(ProducedMessages { schema: Schema::Raw, messages: batch, 
state: None });
        }
        // Wait for notification or timeout
        tokio::select! {
            _ = self.notify.notified() => continue,
            _ = tokio::time::sleep(Duration::from_millis(100)) => {
                return Ok(ProducedMessages { messages: vec![], .. });
            }
            _ = self.cancel_token.cancelled() => {
                return Ok(ProducedMessages { messages: vec![], .. });
            }
        }
    }
}
```

---

## Configuration

```toml
type = "source"
key = "http"
enabled = true
name = "webhook_gateway"
path = "target/release/libiggy_connector_http_source"

[[streams]]
stream = "webhooks"
topics = ["github", "stripe", "generic"]
schema = "json"
batch_length = 100
linger_time = "5ms"

[plugin_config]
bind_address = "0.0.0.0"
port = 9090
max_body_size_bytes = 1048576          # 1MB
buffer_capacity = 10000
max_batch_size = 500

# Config topic (optional — enables endpoint registry + hot reload)
config_topic_stream = "platform"
config_topic_name = "http-source-config"
iggy_connection_string = "tcp://127.0.0.1:8090"

# Global auth (for /topics/{name} endpoints)
auth_bearer_token = "global-webhook-secret"

# Instance identity (for multi-instance setups)
instance_id = "webhook-01"

# HTTP metadata forwarding as Iggy message headers
include_http_metadata = true
forward_headers = ["X-Request-ID", "X-Correlation-ID"]
health_check_enabled = true
```

---

## Request/Response Contract

### Named Topic

```
POST /topics/github
Authorization: Bearer global-webhook-secret

{"event": "push", "repository": "apache/iggy"}
```
```
200 OK  {"status": "queued", "topic": "github", "queue_depth": 42}
```

### Ephemeral Endpoint

```
POST /e/a3f8c2e1b9d04f7a8e6c1d2b3a4f5e6d
X-Hub-Signature-256: sha256=abc123...

{"event": "push", "repository": "apache/iggy"}
```
```
200 OK  {"status": "queued", "endpoint": "a3f8c2e1...", "topic": 
"github_events"}
```

### Error Responses

| Status | Condition |
|--------|-----------|
| 400 | Body exceeds max size |
| 401 | Auth failure (bearer or HMAC) |
| 404 | Unknown topic or revoked endpoint |
| 410 | Expired endpoint |
| 429 | Buffer full (backpressure) + `Retry-After: 1` |
| 503 | Not ready (startup config replay in progress) |

### Health

```
GET /health

200 OK
{"status": "ok", "instance_id": "webhook-01", "buffer_used": 42,
 "buffer_capacity": 10000, "named_topics": ["github", "stripe"],
 "ephemeral_endpoints": 7, "config_offset": 234}
```

---

## HMAC Validation

Per-endpoint HMAC validation over raw request body bytes (never re-serialized 
JSON):

- **SHA-256** — GitHub, Stripe, most modern providers
- **SHA-1** — Legacy providers

Using `ring::hmac::verify` (constant-time comparison). Configured per ephemeral 
endpoint:

```json
{"action": "register", "endpoint_id": "...", "topic": "github_events",
 "auth_type": "hmac-sha256", "hmac_header": "X-Hub-Signature-256",
 "hmac_prefix": "sha256=", "auth_secret": "whsec_..."}
```

Defense in depth: URL is unguessable (128-bit) AND HMAC signature must be 
valid. Either can be rotated independently.

---

## Async Acknowledgment

Respond before producing to Iggy:

```
HTTP POST → validate → enqueue (CAS) → HTTP 200    ~15-50μs
                                          │
               poll() drains queue ───────┘  async, ~1-5ms later
               Iggy producer sends              batched
```

Delivery is at-most-once from the caller's perspective. 200 = "queued locally," 
not "persisted to Iggy." Webhook senders implement retry-on-timeout, providing 
at-least-once from the caller's side.

---

## Comparison: HTTP Sink vs HTTP Source

| Aspect | Sink (#2925) | Source (this proposal) |
|--------|-------------|----------------------|
| Direction | Iggy → HTTP | HTTP → Iggy |
| HTTP role | Client | Server |
| SDK trait | Sink (consume) | Source (poll) |
| Port binding | No | Yes |
| Retry | Connector retries to endpoint | Caller retries to connector |
| Dynamic config | Static TOML | Event-sourced via Iggy topic |
| Dependencies | reqwest, reqwest-middleware | axum, crossbeam, arc-swap, ring |

---

## Architectural Questions for the Team

1. **Own IggyClient inside the connector**: The config consumer creates its own 
`IggyClient` to consume the config topic — separate from the runtime's producer 
pipeline. No existing connector does this. Is this acceptable, or would you 
prefer a runtime-level mechanism for delivering dynamic config to plugins?

2. **Config topic convention**: We propose `platform/http-source-config` for 
the endpoint registry. Is there an established naming convention for 
infrastructure topics?

3. **Server-style source**: This is the first push-based source in the 
ecosystem. All existing sources are poll-based. Any concerns about the 
push-to-pull bridge pattern (HTTP server → bounded queue → poll drains)?

4. **axum**: Already used in the Iggy server and runtime API. Confirmed as the 
right choice for the HTTP server?

5. **Multi-stream production**: The runtime's `spawn_source_handler` creates 
one producer for the last `[[streams]]` entry. Should we target single-stream 
multi-topic, or would it be worth addressing the multi-stream limitation in the 
runtime as a separate PR?

---

## Scope

### Included

- Embedded HTTP server (axum)
- Named topic routing (`/topics/{name}`) + ephemeral endpoints (`/e/{id}`)
- Event-sourced endpoint registry via Iggy config topic
- Hot reload (register/revoke/update endpoints without restart)
- HMAC validation (SHA-256, SHA-1) + bearer token auth
- Lock-free bounded buffer with HTTP 429 backpressure
- Multi-instance coordination via shared config topic
- Health/readiness endpoint
- HTTP metadata forwarding as Iggy message headers
- Structured concurrency (CancellationToken)
- Graceful shutdown within 5s

### Not Included

- TLS termination (use reverse proxy)
- Rate limiting per IP (LB layer)
- WebSocket support (different protocol)
- Batch POST (follow-up enhancement)
- mTLS (use sidecar proxy)
- Config topic compaction (not needed at config-event volumes)
- RBAC / Authorization — deliberate non-goal (see below)

---

## RBAC: Deliberate Non-Goal

We considered and explicitly rejected building RBAC (Role-Based Access Control) 
into the connector. The connector performs **authentication** (verify the 
caller is who they claim) but delegates **authorization** (is this caller 
allowed to perform this action) to the layers above and below it.

The connector sits between two systems that already have their own access 
control:

```
HTTP caller → [API Gateway / LB with RBAC] → Connector → [Iggy server with user 
permissions] → Topic
```

Adding a third authorization layer in the middle creates configuration 
complexity (three places to manage permissions with no clear source of truth), 
consistency headaches (permissions across layers diverge), and scope creep 
(role definitions, permission grants, group management — an entire 
authorization system that would dwarf the connector).

**How authorization is already handled at each layer**:

| Authorization Concern | Handled By | Mechanism |
|---|---|---|
| Can this caller POST to this endpoint? | API Gateway / LB | IP allowlists, 
OAuth scopes, WAF rules |
| Can this caller use this specific ephemeral endpoint? | Connector 
(authentication) | URL is a 128-bit secret + optional HMAC |
| Can this connector produce to this topic? | Iggy server | User permissions on 
streams/topics |
| Can this user register/revoke endpoints? | Iggy server | Write permission on 
config topic |

**Key insight**: Ephemeral endpoints with per-endpoint auth are functionally 
equivalent to per-resource authorization. Each integration gets its own 
endpoint with its own secret. Revoking the endpoint (via config topic) is 
equivalent to revoking access. No role hierarchy needed.

**What would change this**: If Iggy develops a first-party connector 
authorization framework (e.g., connector-level ACLs managed by the server), the 
HTTP source should integrate with it rather than maintaining its own.

---

## Estimated Size

~2500-3100 lines across `lib.rs`, `registry.rs`, `config.rs`, tests, README, 
and integration tests.

---

Looking forward to your feedback on the design, especially the architectural 
questions above. Happy to discuss any aspect in detail.


GitHub link: 
https://github.com/apache/iggy/discussions/3039#discussioncomment-16336298

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to