Hi,

    I mean same code works fine in flink local setup. I can able to see
"Received Message .... from testkafka Topic : " on console when kafka
receive some message (Kafka Producer is in other machine and sending some
message frequently to testkafka topic).
     
     *Submitted the Beam application to flink local by below command :*
         mvn compile exec:java
-Dexec.mainClass=org.apache.beam.influxdb.KafkaRead  -Pflink-runner

     *Output is :*
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#735957608]
with leader session id d97c060d-bdf9-4215-8d7c-138f13cbff1e.
10/13/2017 11:09:09     Job execution switched to status RUNNING.
10/13/2017 11:09:09     Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
10/13/2017 11:09:09     Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
10/13/2017 11:09:09     Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 
*Received in Deserilize..
Received Message .... from testkafka Topic : HELLOASA*



    If I run same code in Flink Cluster I cannot see any message in
log/stdout, But job is continuously running and Kafka Producer is in other
machine and sending some message frequently to testkafka topic.

  * I started flink cluster by below command : *
       bin/start-cluster.sh

   *Submitted the Beam application to flink cluster by below command :*
      bin/flink run -c org.apache.beam.influxdb.KafkaRead
/home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar
--runner=FlinkRunner --flinkMaster=192.168.1.116
--filesToStage=/home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar
    

   In dashboad :

  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1169/DashBoard.png>
 



    I cannot see any message in dashboard :

     
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1169/Stdout.png>
 


   As per log Job execution is running :
Cluster configuration: Standalone cluster with JobManager at
/192.168.1.116:6123
Using address 192.168.1.116:6123 to connect to JobManager.
JobManager web interface address http://192.168.1.116:8081
Starting execution of program
Submitting job with JobID: 8d731f801d00268f951a98d093f21e0c. Waiting for job
completion.
Connected to JobManager at
Actor[akka.tcp://flink@192.168.1.116:6123/user/jobmanager#422012792] with
leader session id 00000000-0000-0000-0000-000000000000.
10/13/2017 11:10:57     Job execution switched to status RUNNING.
10/13/2017 11:10:57     Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
10/13/2017 11:10:57     Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
10/13/2017 11:11:05     Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 

   There is no exception in log. I suspect deployment of kafka having issue. 

Can you please help me to check it.




 public static void main(String[] args) { 
    Pipeline p = initializePipeline(args); 
    Map<String, List&amp;lt;String>> intelliOmIms = new TreeMap<>(); 
 
    PTransform<PBegin, PCollection&amp;lt;KV&amp;lt;Integer, byte[]>>>
reader; 
    reader = KafkaIO.<Integer, byte[]>read() 
           .withBootstrapServers("192.168.1.116:9092")    --->Kafka 
 zookeeper and server running 
            .withTopic("kafkatest") 
            .withKeyDeserializer(IntegerDeserializer.class) 
            .withValueDeserializer(IntelliOmImsKpiDataUtil.class) 
            .withoutMetadata(); 
 
    PCollection<KV&amp;lt;Integer, byte[]>> output = p.apply(reader); 
    output.apply(ParDo.of(new PrintMsg())); 
 
    p.run().waitUntilFinish(); 
 } 

public static class PrintMsg extends DoFn<KV&lt;Integer, byte[]>, Void> {

        @ProcessElement
        public void processElement(ProcessContext c) {

            try {
                System.out.println("Received Message .... from testkafka
Topic : " + new String(c.element().getValue(), "UTF-8"));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    }



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to