[ 
https://issues.apache.org/jira/browse/KAFKA-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9994:
-------------------------------
    Description: 
We have seen a case where the TaskMigrated exception gets thrown from 
taskManager.commit(). This should be prevented by proper catching.

Looking at the stack trace, the TaskMigrated was thrown from preCommit() call 
inside corrupted task exception commit.
{code:java}
[2020-05-14T05:47:25-07:00] 
(streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) [2020-05-14 
12:47:25,635] ERROR 
[stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] 
stream-thread 
[stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] 
Encountered the following exception during processing and the thread is going 
to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
[2020-05-14T05:47:25-07:00] 
(streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) 
org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced 
trying to send a record [stream-thread 
[stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] task 
[1_1]]; it means all tasks belonging to this thread should be migrated.
        at 
org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:216)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
        at 
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStore.log(ChangeLoggingTimestampedWindowBytesStore.java:36)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:112)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:34)
        at 
org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:111)
        at 
org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:91)
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
        at 
org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:296)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
        at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$flush$4(MeteredWindowStore.java:200)
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
        at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:200)
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:402)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:317)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:753)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:573)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)
{code}

  was:We have seen a case where the TaskMigrated exception gets thrown from 
taskManager.commit(). This should be prevented by proper catching.


> Catch TaskMigrated exception in task corruption code path 
> ----------------------------------------------------------
>
>                 Key: KAFKA-9994
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9994
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Boyang Chen
>            Priority: Major
>
> We have seen a case where the TaskMigrated exception gets thrown from 
> taskManager.commit(). This should be prevented by proper catching.
> Looking at the stack trace, the TaskMigrated was thrown from preCommit() call 
> inside corrupted task exception commit.
> {code:java}
> [2020-05-14T05:47:25-07:00] 
> (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) [2020-05-14 
> 12:47:25,635] ERROR 
> [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] 
> stream-thread 
> [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-14T05:47:25-07:00] 
> (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) 
> org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced 
> trying to send a record [stream-thread 
> [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] task 
> [1_1]]; it means all tasks belonging to this thread should be migrated.
>         at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:216)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
>         at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStore.log(ChangeLoggingTimestampedWindowBytesStore.java:36)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:112)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:34)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:111)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:91)
>         at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>         at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
>         at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:296)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$flush$4(MeteredWindowStore.java:200)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:200)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:402)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:317)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:753)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:573)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to