[ 
https://issues.apache.org/jira/browse/KAFKA-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851514#comment-17851514
 ] 

Ganesh Sadanala edited comment on KAFKA-16876 at 6/3/24 4:21 AM:
-----------------------------------------------------------------

[~rohanpd] Thank your for confirming it! I see the flow is from

`TaskManager#handleRevocation` -> `TaskManager#prepareCommitAndAddOffsetsToMap` 
-> `StreamTask#prepareCommit` -> `StreamTask#flush` -> 
`ProcessorStateManager#flushCache`

 

and registered state stores are iterated inside it, I see that in your case 
TaskMigratedException is thrown and is caught inside here:
{code:java}
catch (final RuntimeException exception) {
    if (firstException == null) {
        // do NOT wrap the error if it is actually caused by Streams itself
        if (exception instanceof StreamsException) {
            firstException = exception;
        } {code}
 

 

and finally it is thrown to `StreamTask#flush` method, where I see it is not 
caught/handled. Hence, the entire flow leads to Runtime errors and all the 
active tasks are not revoked. Please correct me if I am wrong.

 

So you would want it to be handled inside the `StreamTask#flush` method 
appropriately?

 

Also could you guide me how you produced those exceptions, I want to produce 
them in my local to get a better picture. 

 

Anything else you want to share will be beneficial.

 

Thank you!


was (Author: JIRAUSER305566):
[~rohanpd] Thank your for confirming it! I see the flow is from

`TaskManager#handleRevocation` -> `TaskManager#prepareCommitAndAddOffsetsToMap` 
-> `StreamTask#prepareCommit` -> `StreamTask#flush` -> 
`ProcessorStateManager#flushCache`

 

and registered state stores are iterated inside it, I see that in your case 
TaskManagerException is thrown and is caught inside here:
{code:java}
catch (final RuntimeException exception) {
    if (firstException == null) {
        // do NOT wrap the error if it is actually caused by Streams itself
        if (exception instanceof StreamsException) {
            firstException = exception;
        } {code}
 

 

and finally it is thrown to `StreamTask#flush` method, where I see it is not 
caught/handled. Hence, the entire flow leads to Runtime errors and all the 
active tasks are not revoked. Please correct me if I am wrong.

 

So you would want it to be handled inside the `StreamTask#flush` method 
appropriately?

 

Also could you guide me how you produced those exceptions, I want to produce 
them in my local to get a better picture. 

 

Anything else you want to share will be beneficial.

 

Thank you!

> TaskManager.handleRevocation doesn't handle errors thrown from 
> task.prepareCommit
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-16876
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16876
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.6.0
>            Reporter: Rohan Desai
>            Assignee: Ganesh Sadanala
>            Priority: Minor
>
> `TaskManager.handleRevocation` does not handle exceptions thrown by 
> `task.prepareCommit`. In the particular instance I observed, `pepareCommit` 
> flushed caches which led to downstream `producer.send` calls that threw a 
> `TaskMigratedException`. This means that the tasks that need to be revoked 
> are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the 
> thrown exception and then moves on to the other task assignment callbacks. 
> One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks 
> and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks 
> if close fails so we don't leak any tasks. I think there's maybe two bugs 
> here:
>  # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. 
> It should try not to leave any revoked tasks in an unsuspended state.
>  # The `ConsumerCoordinator` just throws the first exception that it sees. 
> But it seems bad to throw the `TaskMigratedException` and drop the 
> `IllegalStateException` (though in this case I think its relatively benign). 
> I think on `IllegalStateException` we really want the streams thread to exit. 
> One idea here is to have `ConsumerCoordinator` throw an exception type that 
> includes the other exceptions that it has seen in another field. But this 
> breaks the contract for clients that catch specific exceptions. I'm not sure 
> of a clean solution, but I think its at least worth recording that it would 
> be preferable to have the caller of `poll` handle all the thrown exceptions 
> rather than just the first one.
>  
> Here is the IllegalStateException stack trace I observed:
> {code:java}
> [       508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 
> [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - 
> stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St
> reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining 
> tasks before re-throwing:
> [       508.535] [service_application2] [inf] 
> java.lang.IllegalStateException: Illegal state RUNNING while closing active 
> task 0_3
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) 
> [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) 
> [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> dev.responsive.kafka.internal.clients.DelegatingConsumer.poll(DelegatingConsumer.java:94)
>  [kafka-client-0.24.0-dc9acd1.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  [kafka-streams-3.6.0.jar:?] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to