Hi,

I think Robert meant to write setting the connector dependency to
1.0-SNAPSHOT.

Cheers,
Gyula

Robert Metzger <rmetz...@apache.org> ezt írta (időpont: 2015. dec. 1., K,
17:10):

> Hi Mihail,
>
> the issue is actually a bug in Kafka. We have a JIRA in Flink for this as
> well: https://issues.apache.org/jira/browse/FLINK-3067
>
> Sadly, we haven't released a fix for it yet. Flink 0.10.2 will contain a
> fix.
>
> Since the kafka connector is not contained in the flink binary, you can
> just set the version in your maven pom file to 0.10-SNAPSHOT. Maven will
> then download the code planned for the 0.10-SNAPSHOT release.
>
> On Tue, Dec 1, 2015 at 4:54 PM, Vieru, Mihail <mihail.vi...@zalando.de>
> wrote:
>
>> Hi,
>>
>> we get the following NullPointerException after ~50 minutes when running
>> a streaming job with windowing and state that reads data from Kafka and
>> writes the result to local FS.
>> There are around 170 million messages to be processed, Flink 0.10.1 stops
>> at ~8 million.
>> Flink runs locally, started with the "start-cluster-streaming.sh" script.
>>
>> 12/01/2015 15:06:24    Job execution switched to status RUNNING.
>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>> to SCHEDULED
>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>> to DEPLOYING
>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> SCHEDULED
>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> DEPLOYING
>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>> to RUNNING
>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> RUNNING
>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
>> CANCELED
>> 12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched
>> to FAILED
>> java.lang.Exception
>>     at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>     at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>>     at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>     at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>     at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NullPointerException
>>     at
>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>>     at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>>     at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>>     at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>>     at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>>     at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>>     at
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>>     at
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>>     at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>>
>>
>> Any ideas on what could cause this behaviour?
>>
>> Best,
>> Mihail
>>
>
>

Reply via email to