[ 
https://issues.apache.org/jira/browse/KAFKA-9793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9793:
-------------------------------
    Issue Type: Bug  (was: Improvement)

> Stream HandleAssignment should guarantee task close
> ---------------------------------------------------
>
>                 Key: KAFKA-9793
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9793
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.6.0
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> When triggering the `handleAssignment` call, if task preCommit throws, the 
> doom-to-fail task shall not be closed, thus causing a RocksDB metrics 
> recorder re-addition, which is fatal:
>  
>  
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 
> 23:50:42,668] INFO 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] 
> stream-thread 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] Handle 
> new assignment with:
>         New active tasks: [1_0, 0_1, 2_0]
>         New standby tasks: []
>         Existing active tasks: [0_1, 1_0, 2_0, 3_1]
>         Existing standby tasks: [] 
> (org.apache.kafka.streams.processor.internals.TaskManager)
>  
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 
> 23:50:42,671] INFO 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] 
> stream-thread 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task 
> [3_1] Prepared clean close 
> (org.apache.kafka.streams.processor.internals.StreamTask)
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 
> 23:50:42,671] INFO 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] 
> stream-thread 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task 
> [0_1] Prepared task for committing 
> (org.apache.kafka.streams.processor.internals.StreamTask)
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 
> 23:50:42,682] ERROR 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] 
> stream-thread 
> [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task 
> [1_0] Failed to flush state store logData10MinuteFinalCount-store:  
> (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) 
> org.apache.kafka.streams.errors.TaskMigratedException: Error encountered 
> sending record to topic windowed-node-counts for task 1_0 due to:
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) 
> org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an 
> operation with an old epoch. Either there is a newer producer with the same 
> transactionalId, or the producer's transaction has been expired by the broker.
> [2020-03-31T16:50:43-07:00] 
> (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) Written offsets 
> would not be recorded and no more records would be sent since the producer is 
> fenced, indicating the task may be migrated out; it means all tasks belonging 
> to this thread should be migrated.
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:202)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1352)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:768)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeAbortBatches(Sender.java:485)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:304)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
>         at java.lang.Thread.run(Thread.java:748)
>  
> The correct solution is to wrap the whole code block by try-catch to avoid 
> unexpected close failure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to