Also, you may be required to specify the initial offset to EARLIEST. Ajay
On Fri, May 5, 2017 at 2:35 PM, vikram patil <patilvik...@gmail.com> wrote: > Hi Sushil, > > Have you provided configuration specifying hdfs directory and file for > an application? > You may have to create /tmp/fromKafka directory in hdfs. > Thanks & Regards, > Vikram > > On Fri, May 5, 2017 at 2:30 PM, Sushil Apex <sushil.aa...@gmail.com> > wrote: > > > > I am using Apex 3.5.0 and kafka 0.9 > > malhar library used is 3.6.0 > > > > I am following the example from https://github.com/ > DataTorrent/examples/blob/master/tutorials/kafka/src/ > main/java/com/example/myapexapp/KafkaApp.java > > > > I am putting messages in Kafka topic using consolekafkaProducer provided > by Kafka, but I am not able to read these messages in Apex DAG(created > based on above link). > > > > I am running the apex apa file through apex-cli > > > > Apex CLI 3.5.0 06.12.2016 @ 22:11:51 PST rev: 6de8828 branch: > 6de8828e4f3d5734d0a6f9c1be0aa7057cb60ac8 > > apex> launch --local /home/cloudera/myapexapp-1.0-SNAPSHOT.apa > > 1. Kafka2HDFS > > 2. MyFirstApplication > > Choose application: 1 > > > > and nothing happens after this, I can see the messages put in > consoleConsumer in kafka logs > > > > Properties used are > > > > <property> > > <name>dt.operator.kafkaIn.prop.topics</name> > > <value>kafka2hdfs</value> > > </property> > > > > <property> > > <name>dt.operator.kafkaIn.prop.consumer.zookeeper</name> > > <value>localhost:2181</value> > > </property> > > <property> > > <name>dt.operator.kafkaIn.prop.clusters</name> > > <value>localhost:9092</value> > > </property> > > <property> > > <name>dt.operator.kafkaIn.prop.initialPartitionCount</name> > > <value>1</value> > > </property> > > > > > > Application Code > > > > KafkaSinglePortByteArrayInputOperator in > > = dag.addOperator("kafkaIn", new > KafkaSinglePortByteArrayInputOperator()); > > > > LineOutputOperator out = dag.addOperator("fileOut", new > LineOutputOperator()); > > > > dag.addStream("dataf", in.outputPort, out.input); > > > > > > I am not able to understand what config I am missing here? >