Hi,

We are running confluent s3 conector (3.2.0) and we observed a sink task
not being able to commit offsets after rebalance for like a week.It spits
"WorkerSinkTask:337 - Ignoring invalid task provided offset -- partition
not assigned" every time new file was written to S3. Eventually after 7
days another rebalance happened and offsets were reset to 1 week ago. Only
1 task was affected.
After looking through the code the only possible theory I could come up
with is a callback for async commit being executed after rebalance and
wiping the task state.
It looks like https://issues.apache.org/jira/browse/KAFKA-5731. It
explicitly mentions this scenario here:
https://github.com/apache/kafka/blob/0.11.0.1/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java#L647
.
However after some digging through the code and debugging I still do not
quite understand when this might happen - it does not look like this is
possible given the way NetworkClient works (basically it looks like it
serializes all requests and responses
https://github.com/apache/kafka/blob/0.11.0.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L653)
. Could anyone shed some light on this - how can it happen that async
commit handler from previous "epoch" is invoked after rebalance completed?

Besides if this problem happens in connectors it looks like it should also
affect normal consumers that do have async callbacks - would that be
expected behaviour or a bug?
Also currently connect framework does synchronous commits only upon task
termination - is there any plans to introduce period synchronous commits?
If I want to moitor last committed ofsert for a group - to detect
situations like this - do I have to consume __consumer_offsets or contact
group coordinator for that group?

Best regards,
Stanislav

Reply via email to