GitHub user hubcio added a comment to the discussion: Proposal: Generic HTTP Sink Connector
hey @mlevkov, it always warms my heart when I see someone talking positive about iggy. I went through the full design doc and the codebase. about http sink: it looks well researched, and since you will be using it, it's even better that you will implement it. one thing is that the `individual` mode might be perf problem at any real thrughput - if you set batch_length to 50 with individual mode, then you will have sequential 50 HTTP round trips per poll cycle. other than that, from me you have a green light for implementation (but create issue first). I have a bit more to add about runtime issues (both confirmed, i checkedin code): 1. consume() return value discarded: itis a legit bug, the `increment_errors()` metric method exists but is never called for sink consumption failures. interestingly the source side has a similar issue at line 434 where the callback return is also discarded, though sources at least have some error tracking in the forwarding loop. 2. `AutoCommitWhen::PollingMessages` commits before procesing: yep. due to this we cant achieve at-least-once delivery for any sink connector. unfortunately, auto-commit strategy is also hardoded in sink.rs. your proposed fix for retry logic is somewhat OK but it does not fix resolve the coree issue: if process crashes mid-consume those messages are gone regardless of what happens inside the callback. I think I have somewhat of a plan for proper resolution: step 1 - visibility: - capture the FFI return code in `process_messages()` instead of discarding it - on non-zero: log error with context (plugin_id, stream, topic, return code), increment the `errors` metric - once we surface FFI errors, change the caller (`consume_messages()`) to `continue` on callback failure instead of `return Err(...)` which kills the consumer loop permanently - one transient failure shouldn't brick the sink forever - add a warn log at consumer setup about `PollingMessages` auto-commit semantics this gives operators immediate visibility into failures that are currently invisible, without changing the FFI contract or auto-commit strategy. all existing sinks benefit. step 2 - proper at-least-once delivery - enrich the FFI return codes so the runtime can distinguish transient from permanent errors (backwards-compatible -- existing plugins returning 0/1 still work) - switch sink consumers to manual commit (`AutoCommit::Disabled`) and only commit after successful `consume()`. i checked the SDK - `ConsumingAllMessages` won't work either, it commits when the last buffered message is yielded from `.next()`, which is still before `process_messages()` runs (see `store_offset_after_all_messages` in `consumer.rs`). manual commit is the only correct option. - add a consecutive failure circuit breaker so a broken sink doesn't loop forever - expose commit strategy in config so users can opt into the old at-most-once behavior if they prefer lower latency (notsure about this) step 1 is a standalone PR. step 2 probably splits into SDK changes and runtime changes - we can finalize the details in the tracking issues. finally, answering your questions: 1. yes, open a tracking issue first, link this discussion. 2. the runtime bug fixes should be separate issues + separate PRs from the http sink itself. smaller PRs are easier to review and the fixes benefit all sinks immediately 3. skip `body_template` for v1, that's the right call. if someone needs custom envelopes, the transform system already exists for that 4. env var expansion in headers - worth considering but also fine as a follow-up. for v1, users can just set the literal value in config and unfortunately I'm gonna end this overly long message with more open questions: looking forward to seeing the issues and the PR! GitHub link: https://github.com/apache/iggy/discussions/2919#discussioncomment-16088360 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
