Hi Stefanos,

this looks like an issue with Kafka. Which version does your broker have?
Can you check the logs of the broker you are trying to connect to?

On Fri, Mar 11, 2016 at 5:27 PM, Stefanos Antaris <
antaris.stefa...@gmail.com> wrote:

> Hi to all,
>
> i am trying to make Flink to work with Kafka but i always have the
> following exception. It works perfect on my laptop but when i try to use it
> on the cluster it always fails.
>
> java.lang.Exception
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.nio.channels.ClosedChannelException
>       at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>       at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
>       at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>       at 
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
>       at 
> kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:759)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getMissingOffsetsFromKafka(LegacyFetcher.java:712)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:462)
>
>
>
> Here is my pom.xml if it helps with the error
>
> <dependencies>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-core</artifactId>
> <version>1.0.0</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-clients_2.10</artifactId>
> <version>1.0.0</version>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-java_2.10</artifactId>
> <version>1.0.0</version>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
> <version>1.0.0</version>
> </dependency>
>
>
> </dependencies>
>
> Best regards,
> Stefanos
>

Reply via email to