Below is my setup 
        1. Kafka zookeeper and server in one machine (192.168.1.116) and
producer (192.168.1.100) and consumer (192.168.1.117) in another machine.  
--> This work fine no issue 
        2. Running standalone beam application with kafka consumer --> This
work fine
        3. Running beam application in flink cluster with kafka consumer -->
This is not working
  Not receiving message from kafka producer.

Same program works fine with standalone with flink runner.
Below is my code snippet.

public static void main(String[] args) {
    Pipeline p = initializePipeline(args);
    Map<String, List&lt;String>> intelliOmIms = new TreeMap<>();

    PTransform<PBegin, PCollection&lt;KV&lt;Integer, byte[]>>> reader;
    reader = KafkaIO.<Integer, byte[]>read()
            .withBootstrapServers("192.168.1.116:9092")    --->Kafka
zookeeper and server running
            .withTopic("kafkatest")
            .withKeyDeserializer(IntegerDeserializer.class)
            .withValueDeserializer(IntelliOmImsKpiDataUtil.class)
            .withoutMetadata();

    PCollection<KV&lt;Integer, byte[]>> output = p.apply(reader);
    output.apply(ParDo.of(new PrintMsg()));

    p.run().waitUntilFinish();
}

  In IntelliOmImsKpiDataUtil deserializer I am just printing message saying
that kafka is received the message.

public static class PrintMsg extends DoFn<KV&lt;Integer, byte[]>, Void> {

    @ProcessElement
    public void processElement(ProcessContext c) {
        System.out.println("Received Message .... from kafkatest Topic ");
    }
}

  Started Zookeeper in 192.168.1.116 like below :
    bin/zookeeper-server-start.sh config/zookeeper.properties
  
  Started Server in 192.168.1.116 like below :
    bin/kafka-server-start.sh config/server.properties
  
  Started Producer in 192.168.1.100 like below :
    bin/kafka-console-producer.sh --broker-list 192.168.1.116:9092 --topic
kafkatest

  Started Consumer in 192.168.1.117 like below :
    bin/kafka-console-consumer.sh --zookeeper 192.168.1.116:2181 --topic
kafkatest --from-beginning

   With standalone beam application kafka can receive the message, But in
cluster setup it is not working.

Can you please help me to check it. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to