Hi,

What do you mean by:

> With standalone beam application kafka can receive the message, But in
cluster setup it is not working.

In your example you are reading the data from Kafka and printing them to 
console. There doesn’t seems to be anything that writes back to Kafka, so what 
do you mean by “Kafka can not receive the message”?

Did you check the output file of your application in the log directory? Did you 
check Flink logs if there are any errors?

Piotrek

> On 12 Oct 2017, at 15:49, Shankara <shankara....@gmail.com> wrote:
> 
> Below is my setup 
>        1. Kafka zookeeper and server in one machine (192.168.1.116) and
> producer (192.168.1.100) and consumer (192.168.1.117) in another machine.  
> --> This work fine no issue 
>        2. Running standalone beam application with kafka consumer --> This
> work fine
>        3. Running beam application in flink cluster with kafka consumer -->
> This is not working
>  Not receiving message from kafka producer.
> 
> Same program works fine with standalone with flink runner.
> Below is my code snippet.
> 
> public static void main(String[] args) {
>    Pipeline p = initializePipeline(args);
>    Map<String, List&lt;String>> intelliOmIms = new TreeMap<>();
> 
>    PTransform<PBegin, PCollection&lt;KV&lt;Integer, byte[]>>> reader;
>    reader = KafkaIO.<Integer, byte[]>read()
>            .withBootstrapServers("192.168.1.116:9092")    --->Kafka
> zookeeper and server running
>            .withTopic("kafkatest")
>            .withKeyDeserializer(IntegerDeserializer.class)
>            .withValueDeserializer(IntelliOmImsKpiDataUtil.class)
>            .withoutMetadata();
> 
>    PCollection<KV&lt;Integer, byte[]>> output = p.apply(reader);
>    output.apply(ParDo.of(new PrintMsg()));
> 
>    p.run().waitUntilFinish();
> }
> 
>  In IntelliOmImsKpiDataUtil deserializer I am just printing message saying
> that kafka is received the message.
> 
> public static class PrintMsg extends DoFn<KV&lt;Integer, byte[]>, Void> {
> 
>    @ProcessElement
>    public void processElement(ProcessContext c) {
>        System.out.println("Received Message .... from kafkatest Topic ");
>    }
> }
> 
>  Started Zookeeper in 192.168.1.116 like below :
>    bin/zookeeper-server-start.sh config/zookeeper.properties
> 
>  Started Server in 192.168.1.116 like below :
>    bin/kafka-server-start.sh config/server.properties
> 
>  Started Producer in 192.168.1.100 like below :
>    bin/kafka-console-producer.sh --broker-list 192.168.1.116:9092 --topic
> kafkatest
> 
>  Started Consumer in 192.168.1.117 like below :
>    bin/kafka-console-consumer.sh --zookeeper 192.168.1.116:2181 --topic
> kafkatest --from-beginning
> 
>   With standalone beam application kafka can receive the message, But in
> cluster setup it is not working.
> 
> Can you please help me to check it. 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to