I am using Apex 3.5.0 and kafka 0.9
malhar library used is 3.6.0
I am following the example from
https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/KafkaApp.java
I am putting messages in Kafka topic using consolekafkaProducer provided by
Kafka, but I am not able to read these messages in Apex DAG(created based
on above link).
I am running the apex apa file through apex-cli
Apex CLI 3.5.0 06.12.2016 @ 22:11:51 PST rev: 6de8828 branch:
6de8828e4f3d5734d0a6f9c1be0aa7057cb60ac8
apex> launch --local /home/cloudera/myapexapp-1.0-SNAPSHOT.apa
1. Kafka2HDFS
2. MyFirstApplication
Choose application: 1
and nothing happens after this, I can see the messages put in
consoleConsumer in kafka logs
Properties used are
<property>
<name>dt.operator.kafkaIn.prop.topics</name>
<value>kafka2hdfs</value>
</property>
<property>
<name>dt.operator.kafkaIn.prop.consumer.zookeeper</name>
<value>localhost:2181</value>
</property>
<property>
<name>dt.operator.kafkaIn.prop.clusters</name>
<value>localhost:9092</value>
</property>
<property>
<name>dt.operator.kafkaIn.prop.initialPartitionCount</name>
<value>1</value>
</property>
Application Code
KafkaSinglePortByteArrayInputOperator in
= dag.addOperator("kafkaIn", new
KafkaSinglePortByteArrayInputOperator());
LineOutputOperator out = dag.addOperator("fileOut", new
LineOutputOperator());
dag.addStream("dataf", in.outputPort, out.input);
I am not able to understand what config I am missing here?