Hi, I think Robert meant to write setting the connector dependency to 1.0-SNAPSHOT.
Cheers, Gyula Robert Metzger <rmetz...@apache.org> ezt írta (időpont: 2015. dec. 1., K, 17:10): > Hi Mihail, > > the issue is actually a bug in Kafka. We have a JIRA in Flink for this as > well: https://issues.apache.org/jira/browse/FLINK-3067 > > Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain a > fix. > > Since the kafka connector is not contained in the flink binary, you can > just set the version in your maven pom file to 0.10-SNAPSHOT. Maven will > then download the code planned for the 0.10-SNAPSHOT release. > > On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail <mihail.vi...@zalando.de> > wrote: > >> Hi, >> >> we get the following NullPointerException after ~50 minutes when running >> a streaming job with windowing and state that reads data from Kafka and >> writes the result to local FS. >> There are around 170 million messages to be processed, Flink 0.10.1 stops >> at ~8 million. >> Flink runs locally, started with the "start-cluster-streaming.sh" script. >> >> 12/01/2015 15:06:24 Job execution switched to status RUNNING. >> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >> to SCHEDULED >> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >> to DEPLOYING >> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >> SCHEDULED >> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >> DEPLOYING >> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >> to RUNNING >> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >> RUNNING >> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at >> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >> CANCELED >> 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) switched >> to FAILED >> java.lang.Exception >> at >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.NullPointerException >> at >> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) >> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) >> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) >> at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) >> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) >> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) >> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) >> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) >> at >> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) >> at >> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) >> >> >> Any ideas on what could cause this behaviour? >> >> Best, >> Mihail >> > >