[ 
https://issues.apache.org/jira/browse/FLINK-3368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15174251#comment-15174251
 ] 

Stephan Ewen commented on FLINK-3368:
-------------------------------------

In Flink 1.0, that is pretty much how it works: If brokers rebalance, the 
partition handling marked for re-assignment and and picked up by a different 
broker connection.
Works pretty quick and the job does not notice anything.

We tested it also on Kafka installations where the brokers were out of sync 
concerning their metadata, so the cluster "rebalanced" many times very quickly.

> Kafka 0.8 consumer fails to recover from broker shutdowns
> ---------------------------------------------------------
>
>                 Key: FLINK-3368
>                 URL: https://issues.apache.org/jira/browse/FLINK-3368
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.0
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>            Priority: Blocker
>             Fix For: 1.0.0
>
>
> It seems that the Kafka Consumer (0.8) fails to restart a job after it failed 
> due to a Kafka broker shutdown.
> {code}
> java.lang.Exception: Unable to get last offset for partitions [FetchPartition 
> {topic=a, partition=13, offset=-915623761776}, FetchPartition {topic=b, 
> partition=13, offset=-915623761776}, FetchPartition {topic=c, partition=13, 
> offset=-915623761776}, FetchPartition {topic=d, partition=13, 
> offset=-915623761776}, FetchPartition {topic=e, partition=13, 
> offset=-915623761776}, FetchPartition {topic=f, partition=13, 
> offset=-915623761776}, FetchPartition {topic=g, partition=13, 
> offset=-915623761776}].
> Exception for partition 13: kafka.common.NotLeaderForPartitionException
>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>       at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>       at java.lang.Class.newInstance(Class.java:442)
>       at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>       at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:551)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:379)
> {code}
> I haven't understood the cause of this issue, but I'll investigate it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to