[ https://issues.apache.org/jira/browse/KAFKA-6595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383062#comment-16383062 ]
huxihx commented on KAFKA-6595: ------------------------------- Could we simply cancel the flush if we find another thread has already initiated a flushing? > Kafka connect commit offset incorrectly. > ---------------------------------------- > > Key: KAFKA-6595 > URL: https://issues.apache.org/jira/browse/KAFKA-6595 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.10.2.0 > Reporter: Hanlin Liu > Priority: Major > > Version: ConfluentPlatform Kafka 3.2.0 > SourceTaskOffsetComitter calls commitOffset() and waits for all incomplete > records to be sent. While the task is stopped, commitOffset() is called again > by the final block in WorkerSourceTask.execute(), it will throw {{Invalid > call to OffsetStorageWriter flush() while already flushing, the framework > should not allow this}} exception. This will trigger closing Producer without > waiting the flush timeout. > After 30 seconds, all incomplete records has been forcefully aborted. If the > {{offset.flush.timeout.ms}} is configured larger than 30 seconds, > WorkerSourceTask will consider those aborted records as sent within flush > timeout, which results in incorrectly flushing the source offset. > > {code:java} > // code placeholder > 2018-02-27 02:59:33,134 INFO [] Stopping connector > dp-sqlserver-connector-dptask_455 [pool-3-thread-6][Worker.java:254] > 2018-02-27 02:59:33,134 INFO [] Stopped connector > dp-sqlserver-connector-dptask_455 [pool-3-thread-6][Worker.java:264] > 2018-02-27 02:59:34,121 ERROR [] Invalid call to OffsetStorageWriter flush() > while already flushing, the framework should not allow this > [pool-1-thread-13][OffsetStorageWriter.java:110] > 2018-02-27 02:59:34,121 ERROR [] Task dp-sqlserver-connector-dptask_455-0 > threw an uncaught and unrecoverable exception > [pool-1-thread-13][WorkerTask.java:141] > org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is > already flushing > at > org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:294) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:177) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2018-02-27 03:00:00,734 ERROR [] Graceful stop of task > dp-sqlserver-connector-dptask_455-0 failed. > [pool-3-thread-1][Worker.java:405] > 2018-02-27 03:00:04,126 INFO [] Proceeding to force close the producer since > pending requests could not be completed within timeout 30 ms. > [pool-1-thread-13][KafkaProducer.java:713] > 2018-02-27 03:00:04,127 ERROR [] dp-sqlserver-connector-dptask_455-0 failed > to send record to dptask_455.JF_TEST_11.jf_test_tab_8: {} > [kafka-producer-network-thread | producer-31][WorkerSourceTask.java:228] > java.lang.IllegalStateException: Producer is closed forcefully. > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147) > at java.lang.Thread.run(Thread.java:745) > 2018-02-27 03:00:09,920 INFO [] Finished > WorkerSourceTask{id=dp-sqlserver-connector-dptask_455-0} commitOffsets > successfully in 47088 ms [pool-4-thread-1][WorkerSourceTask.java:371] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)