[ 
https://issues.apache.org/jira/browse/KAFKA-6595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383062#comment-16383062
 ] 

huxihx commented on KAFKA-6595:
-------------------------------

Could we simply cancel the flush if we find another thread has already 
initiated a flushing?

> Kafka connect commit offset incorrectly.
> ----------------------------------------
>
>                 Key: KAFKA-6595
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6595
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.2.0
>            Reporter: Hanlin Liu
>            Priority: Major
>
> Version: ConfluentPlatform Kafka 3.2.0
> SourceTaskOffsetComitter calls commitOffset() and waits for all incomplete 
> records to be sent. While the task is stopped, commitOffset() is called again 
> by the final block in WorkerSourceTask.execute(), it will throw {{Invalid 
> call to OffsetStorageWriter flush() while already flushing, the framework 
> should not allow this}} exception. This will trigger closing Producer without 
> waiting the flush timeout.
> After 30 seconds, all incomplete records has been forcefully aborted. If the 
> {{offset.flush.timeout.ms}} is configured larger than 30 seconds, 
> WorkerSourceTask will consider those aborted records as sent within flush 
> timeout, which results in incorrectly flushing the source offset.
>  
> {code:java}
> // code placeholder
> 2018-02-27 02:59:33,134 INFO  [] Stopping connector 
> dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:254]
> 2018-02-27 02:59:33,134 INFO  [] Stopped connector 
> dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:264]
> 2018-02-27 02:59:34,121 ERROR [] Invalid call to OffsetStorageWriter flush() 
> while already flushing, the framework should not allow this   
> [pool-1-thread-13][OffsetStorageWriter.java:110]
> 2018-02-27 02:59:34,121 ERROR [] Task dp-sqlserver-connector-dptask_455-0 
> threw an uncaught and unrecoverable exception   
> [pool-1-thread-13][WorkerTask.java:141]
> org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is 
> already flushing
>         at 
> org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:294)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:177)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 2018-02-27 03:00:00,734 ERROR [] Graceful stop of task 
> dp-sqlserver-connector-dptask_455-0 failed.   
> [pool-3-thread-1][Worker.java:405]
> 2018-02-27 03:00:04,126 INFO  [] Proceeding to force close the producer since 
> pending requests could not be completed within timeout 30 ms.   
> [pool-1-thread-13][KafkaProducer.java:713]
> 2018-02-27 03:00:04,127 ERROR [] dp-sqlserver-connector-dptask_455-0 failed 
> to send record to dptask_455.JF_TEST_11.jf_test_tab_8: {}   
> [kafka-producer-network-thread | producer-31][WorkerSourceTask.java:228]
> java.lang.IllegalStateException: Producer is closed forcefully.
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> 2018-02-27 03:00:09,920 INFO  [] Finished 
> WorkerSourceTask{id=dp-sqlserver-connector-dptask_455-0} commitOffsets 
> successfully in 47088 ms   [pool-4-thread-1][WorkerSourceTask.java:371]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to