Rohan Desai created KAFKA-16876: ----------------------------------- Summary: TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit Key: KAFKA-16876 URL: https://issues.apache.org/jira/browse/KAFKA-16876 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.6.0 Reporter: Rohan Desai
`TaskManager.handleRevocation` does not handle exceptions thrown by `task.prepareCommit`. In the particular instance I observed, `pepareCommit` flushed caches which led to downstream `producer.send` calls that threw a `TaskMigratedException`. This means that the tasks that need to be revoked are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the thrown exception and then moves on to the other task assignment callbacks. One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks if close fails so we don't leak any tasks. I think there's maybe two bugs here: # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. It should try not to leave any revoked tasks in an unsuspended state. # The `ConsumerCoordinator` just throws the first exception that it sees. But it seems bad to throw the `TaskMigratedException` and drop the `IllegalStateException` (though in this case I think its relatively benign). I think on `IllegalStateException` we really want the streams thread to exit. One idea here is to have `ConsumerCoordinator` throw an exception type that includes the other exceptions that it has seen in another field. But this breaks the contract for clients that catch specific exceptions. I'm not sure of a clean solution, but I think its at least worth recording that it would be preferable to have the caller of `poll` handle all the thrown exceptions rather than just the first one. Here is the IllegalStateException stack trace I observed: {code:java} [ 508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining tasks before re-throwing: [ 508.535] [service_application2] [inf] java.lang.IllegalStateException: Illegal state RUNNING while closing active task 0_3 [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673) ~[kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546) ~[kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295) ~[kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at dev.responsive.kafka.internal.clients.DelegatingConsumer.poll(DelegatingConsumer.java:94) [kafka-client-0.24.0-dc9acd1.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) [kafka-streams-3.6.0.jar:?] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)