Hi,

What version of Flink are you using. In earlier 1.3.x releases there were some 
bugs in Kafka Consumer code.
Could you change the log level in Flink to debug? 
Did you check the Kafka logs for some hint maybe?
I guess that metrics like bytes read/input records of this Link application are 
not changing?

Piotrek

> On 13 Oct 2017, at 07:51, Shankara <shankara....@gmail.com> wrote:
> 
> Hi,
> 
>    I mean same code works fine in flink local setup. I can able to see
> "Received Message .... from testkafka Topic : " on console when kafka
> receive some message (Kafka Producer is in other machine and sending some
> message frequently to testkafka topic).
> 
>     *Submitted the Beam application to flink local by below command :*
>         mvn compile exec:java
> -Dexec.mainClass=org.apache.beam.influxdb.KafkaRead  -Pflink-runner
> 
>     *Output is :*
> Connected to JobManager at Actor[akka://flink/user/jobmanager_1#735957608]
> with leader session id d97c060d-bdf9-4215-8d7c-138f13cbff1e.
> 10/13/2017 11:09:09   Job execution switched to status RUNNING.
> 10/13/2017 11:09:09   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
> 10/13/2017 11:09:09   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
> 10/13/2017 11:09:09   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 
> *Received in Deserilize..
> Received Message .... from testkafka Topic : HELLOASA*
> 
> 
> 
>    If I run same code in Flink Cluster I cannot see any message in
> log/stdout, But job is continuously running and Kafka Producer is in other
> machine and sending some message frequently to testkafka topic.
> 
>  * I started flink cluster by below command : *
>       bin/start-cluster.sh
> 
>   *Submitted the Beam application to flink cluster by below command :*
>      bin/flink run -c org.apache.beam.influxdb.KafkaRead
> /home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar
> --runner=FlinkRunner --flinkMaster=192.168.1.116
> --filesToStage=/home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar
>     
> 
>   In dashboad :
> 
> 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1169/DashBoard.png>
>  
> 
> 
> 
>    I cannot see any message in dashboard :
> 
> 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1169/Stdout.png>
>  
> 
> 
>   As per log Job execution is running :
> Cluster configuration: Standalone cluster with JobManager at
> /192.168.1.116:6123
> Using address 192.168.1.116:6123 to connect to JobManager.
> JobManager web interface address http://192.168.1.116:8081
> Starting execution of program
> Submitting job with JobID: 8d731f801d00268f951a98d093f21e0c. Waiting for job
> completion.
> Connected to JobManager at
> Actor[akka.tcp://flink@192.168.1.116:6123/user/jobmanager#422012792] with
> leader session id 00000000-0000-0000-0000-000000000000.
> 10/13/2017 11:10:57   Job execution switched to status RUNNING.
> 10/13/2017 11:10:57   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
> 10/13/2017 11:10:57   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
> 10/13/2017 11:11:05   Source: Read(UnboundedKafkaSource) -> Flat Map ->
> ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 
> 
>   There is no exception in log. I suspect deployment of kafka having issue. 
> 
> Can you please help me to check it.
> 
> 
> 
> 
> public static void main(String[] args) { 
>    Pipeline p = initializePipeline(args); 
>    Map<String, List&amp;lt;String>> intelliOmIms = new TreeMap<>(); 
> 
>    PTransform<PBegin, PCollection&amp;lt;KV&amp;lt;Integer, byte[]>>>
> reader; 
>    reader = KafkaIO.<Integer, byte[]>read() 
>           .withBootstrapServers("192.168.1.116:9092")    --->Kafka 
> zookeeper and server running 
>            .withTopic("kafkatest") 
>            .withKeyDeserializer(IntegerDeserializer.class) 
>            .withValueDeserializer(IntelliOmImsKpiDataUtil.class) 
>            .withoutMetadata(); 
> 
>    PCollection<KV&amp;lt;Integer, byte[]>> output = p.apply(reader); 
>    output.apply(ParDo.of(new PrintMsg())); 
> 
>    p.run().waitUntilFinish(); 
> } 
> 
> public static class PrintMsg extends DoFn<KV&lt;Integer, byte[]>, Void> {
> 
>        @ProcessElement
>        public void processElement(ProcessContext c) {
> 
>            try {
>                System.out.println("Received Message .... from testkafka
> Topic : " + new String(c.element().getValue(), "UTF-8"));
>            } catch (UnsupportedEncodingException e) {
>                e.printStackTrace();
>            }
>        }
>    }
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to