[
https://issues.apache.org/jira/browse/KAFKA-5301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang reopened KAFKA-5301:
----------------------------------
[~enothereska] I think we should not resolve this JIRA in a hurry. Have you
made a thorough pass over the consumer code path and confirmed that {{the rest
are OK}}?
For example, one obvious pitfall I can observe is the {{rebalanceException}}
used in the {{StreamThread}}: we throw the exception in the
{{onPartitionsRevoked}} and {{onPartitionsAssigned}} and at the same time
remember that exception in this variable, the thrown exception from the
callback will be swallowed by the {{ConsumerCoordinator}} and logged as an
error, while we will later on rethrow the exception again. I can see two issues
here:
1) throw the exception twice, with the first thrown exception only causing a
error log4j entry is redundant. If we will anyways rethrow the exception after
the rebalance, we may consider not throwing it anymore inside the callbacks.
2) when we throw the exception in the {{..Revoked}} callback, we are
effectively leaving the assignor in an unstable state such that the suspended
tasks / prev tasks etc are not set correctly, however we will still call
{{..Assigned}} later which may be problematic; should we consider skipping the
later callback if an exception has already been thrown, or should we cleanup
the cached maps while throwing the exception?
> Improve exception handling on consumer path
> -------------------------------------------
>
> Key: KAFKA-5301
> URL: https://issues.apache.org/jira/browse/KAFKA-5301
> Project: Kafka
> Issue Type: Sub-task
> Components: streams
> Affects Versions: 0.11.0.0
> Reporter: Eno Thereska
> Assignee: Eno Thereska
> Fix For: 0.11.1.0
>
>
> Used in StreamsThread.java, mostly to .poll() but also to restore data.
> Used in StreamsTask.java, mostly to .pause(), .resume()
> All exceptions here are currently caught all the way up to the main running
> loop in a broad catch(Exception e) statement in StreamThread.run().
> One main concern on the consumer path is handling deserialization errors that
> happen before streams has even had a chance to look at the data:
> https://issues.apache.org/jira/browse/KAFKA-5157
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)