Rohan Desai created KAFKA-16876:
-----------------------------------

             Summary: TaskManager.handleRevocation doesn't handle errors thrown 
from task.prepareCommit
                 Key: KAFKA-16876
                 URL: https://issues.apache.org/jira/browse/KAFKA-16876
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.6.0
            Reporter: Rohan Desai


`TaskManager.handleRevocation` does not handle exceptions thrown by 
`task.prepareCommit`. In the particular instance I observed, `pepareCommit` 
flushed caches which led to downstream `producer.send` calls that threw a 
`TaskMigratedException`. This means that the tasks that need to be revoked are 
not suspended by `handleRevocation`. `ConsumerCoordinator` stores the thrown 
exception and then moves on to the other task assignment callbacks. One of 
these - `StreamsPartitionAssigner.onCommit` tries to close the tasks and raises 
an `IllegalStateException`. Fortunately, it dirty-closes the tasks if close 
fails so we don't leak any tasks. I think there's maybe two bugs here:
 # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. It 
should try not to leave any revoked tasks in an unsuspended state.
 # The `ConsumerCoordinator` just throws the first exception that it sees. But 
it seems bad to throw the `TaskMigratedException` and drop the 
`IllegalStateException` (though in this case I think its relatively benign). I 
think on `IllegalStateException` we really want the streams thread to exit. One 
idea here is to have `ConsumerCoordinator` throw an exception type that 
includes the other exceptions that it has seen in another field. But this 
breaks the contract for clients that catch specific exceptions. I'm not sure of 
a clean solution, but I think its at least worth recording that it would be 
preferable to have the caller of `poll` handle all the thrown exceptions rather 
than just the first one.

 

Here is the IllegalStateException stack trace I observed:
{code:java}
[       508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 
[e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - 
stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St
reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining 
tasks before re-throwing:
[       508.535] [service_application2] [inf] java.lang.IllegalStateException: 
Illegal state RUNNING while closing active task 0_3
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673)
 ~[kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546)
 ~[kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295)
 ~[kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630)
 [kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
 [kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
 [kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) 
[kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) 
[kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
dev.responsive.kafka.internal.clients.DelegatingConsumer.poll(DelegatingConsumer.java:94)
 [kafka-client-0.24.0-dc9acd1.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
 [kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
 [kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
 [kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
 [kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
 [kafka-streams-3.6.0.jar:?] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to