Hanlin Liu created KAFKA-6595:
---------------------------------
Summary: Kafka connect commit offset incorrectly.
Key: KAFKA-6595
URL: https://issues.apache.org/jira/browse/KAFKA-6595
Project: Kafka
Issue Type: Bug
Components: KafkaConnect
Affects Versions: 0.10.2.0
Reporter: Hanlin Liu
SourceTaskOffsetComitter calls commitOffset() and waits for all incomplete
records to be sent. While the task is stopped, commitOffset() is called again
by the final block in WorkerSourceTask.execute(), it will throw {{Invalid call
to OffsetStorageWriter flush() while already flushing, the framework should not
allow this}} exception. This will trigger closing Producer without waiting the
flush timeout.
After 30 seconds, all incomplete records has been forcefully aborted. If the
{{offset.flush.timeout.ms}} is configured larger than 30 seconds,
WorkerSourceTask will consider those aborted records as sent within flush
timeout, which results in incorrectly flushing the source offset.
{code:java}
// code placeholder
2018-02-27 02:59:33,134 INFO [] Stopping connector
dp-sqlserver-connector-dptask_455 [pool-3-thread-6][Worker.java:254]
2018-02-27 02:59:33,134 INFO [] Stopped connector
dp-sqlserver-connector-dptask_455 [pool-3-thread-6][Worker.java:264]
2018-02-27 02:59:34,121 ERROR [] Invalid call to OffsetStorageWriter flush()
while already flushing, the framework should not allow this
[pool-1-thread-13][OffsetStorageWriter.java:110]
2018-02-27 02:59:34,121 ERROR [] Task dp-sqlserver-connector-dptask_455-0 threw
an uncaught and unrecoverable exception
[pool-1-thread-13][WorkerTask.java:141]
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is
already flushing
at
org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
at
org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:294)
at
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:177)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2018-02-27 03:00:00,734 ERROR [] Graceful stop of task
dp-sqlserver-connector-dptask_455-0 failed. [pool-3-thread-1][Worker.java:405]
2018-02-27 03:00:04,126 INFO [] Proceeding to force close the producer since
pending requests could not be completed within timeout 30 ms.
[pool-1-thread-13][KafkaProducer.java:713]
2018-02-27 03:00:04,127 ERROR [] dp-sqlserver-connector-dptask_455-0 failed to
send record to dptask_455.JF_TEST_11.jf_test_tab_8: {}
[kafka-producer-network-thread | producer-31][WorkerSourceTask.java:228]
java.lang.IllegalStateException: Producer is closed forcefully.
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
at java.lang.Thread.run(Thread.java:745)
2018-02-27 03:00:09,920 INFO [] Finished
WorkerSourceTask{id=dp-sqlserver-connector-dptask_455-0} commitOffsets
successfully in 47088 ms [pool-4-thread-1][WorkerSourceTask.java:371]
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)