Can you double check that the kafka instance is up ?
The code looks fine.

Best,

Miki

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <eyal.p...@startapp.com> wrote:

> Hi,
>
> I'm trying to consume events using Apache Flink.
>
> The code is very basic, trying to connect the topic split words by space
> and print it to the console. Kafka version is 0.9.
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
>
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>
>
>
> import org.apache.flink.streaming.api.datastream.DataStream;
>
> import org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment;
>
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
>
> import org.apache.flink.util.Collector;
>
> import java.util.Properties;
>
>
>
> public class KafkaStreaming {
>
>
>
> public static void main(String[] args) throws Exception {
>
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment
> .getExecutionEnvironment();
>
>
>
>     Properties props = new Properties();
>
>     props.setProperty("bootstrap.servers", "kafka servers:9092...");
>
>     props.setProperty("group.id", "flinkPOC");
>
>     FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>(
> "topic", new SimpleStringSchema(), props);
>
>
>
>     DataStream<String> dataStream = env.addSource(consumer);
>
>
>
>     DataStream<String> wordDataStream = dataStream.flatMap(new Splitter
> ());
>
>     wordDataStream.print();
>
>     env.execute("Word Split");
>
>
>
> }
>
>
>
> public static class Splitter implements FlatMapFunction<String, String> {
>
>
>
>     public void flatMap(String sentence, Collector<String> out) throws
> Exception {
>
>
>
>         for (String word : sentence.split(" ")) {
>
>             out.collect(word);
>
>         }
>
>     }
>
>
>
> }
>
> }
>
>
>
> The app does not print anything to the screen (although I produced events
> to Kafka).
>
> I tried to skip the Splitter FlatMap function, but still nothing happens.
> SSL or any kind of authentication is not required from Kafka.
>
> This is the error that I found in the logs:
>
> 2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.
> ExecutionGraph        - Source: Custom Source -> Flat Map -> Sink: Print
> to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from
> RUNNING to FAILED.
>
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
> fetching topic metadata
>
>
>
> The Kafka’s topic has only one partition, so the topic metadata supposed
> to be very basic.
>
> I ran Kafka and the Flink locally in order to eliminate network related
> issues, but the issue persists. So my assumption is that I’m doing
> something wrong…
>
> Did you encounter such issue? Does someone have different code for
> consuming Kafka events ?
>
>
>
> Best regards
>
> Eyal Peer */ *Data Platform Developer
>
>
>

Reply via email to