[
https://issues.apache.org/jira/browse/KAFKA-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax reassigned KAFKA-16876:
---------------------------------------
Assignee: Lianet Magrans (was: Ganesh Sadanala)
> TaskManager.handleRevocation doesn't handle errors thrown from
> task.prepareCommit
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-16876
> URL: https://issues.apache.org/jira/browse/KAFKA-16876
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.6.0
> Reporter: Rohan Desai
> Assignee: Lianet Magrans
> Priority: Minor
>
> `TaskManager.handleRevocation` does not handle exceptions thrown by
> `task.prepareCommit`. In the particular instance I observed, `pepareCommit`
> flushed caches which led to downstream `producer.send` calls that threw a
> `TaskMigratedException`. This means that the tasks that need to be revoked
> are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the
> thrown exception and then moves on to the other task assignment callbacks.
> One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks
> and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks
> if close fails so we don't leak any tasks. I think there's maybe two bugs
> here:
> # `TaskManager.handleRevocation` should handle errors from `prepareCommit`.
> It should try not to leave any revoked tasks in an unsuspended state.
> # The `ConsumerCoordinator` just throws the first exception that it sees.
> But it seems bad to throw the `TaskMigratedException` and drop the
> `IllegalStateException` (though in this case I think its relatively benign).
> I think on `IllegalStateException` we really want the streams thread to exit.
> One idea here is to have `ConsumerCoordinator` throw an exception type that
> includes the other exceptions that it has seen in another field. But this
> breaks the contract for clients that catch specific exceptions. I'm not sure
> of a clean solution, but I think its at least worth recording that it would
> be preferable to have the caller of `poll` handle all the thrown exceptions
> rather than just the first one.
>
> Here is the IllegalStateException stack trace I observed:
> {code:java}
> [ 508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556
> [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager -
> stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St
> reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining
> tasks before re-throwing:
> [ 508.535] [service_application2] [inf]
> java.lang.IllegalStateException: Illegal state RUNNING while closing active
> task 0_3
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673)
> ~[kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546)
> ~[kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295)
> ~[kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
> [kafka-clients-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> dev.responsive.kafka.internal.clients.DelegatingConsumer.poll(DelegatingConsumer.java:94)
> [kafka-client-0.24.0-dc9acd1.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
> [kafka-streams-3.6.0.jar:?]
> [ 508.535] [service_application2] [inf] at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
> [kafka-streams-3.6.0.jar:?] {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)