Hi Sushil, Have you provided configuration specifying hdfs directory and file for an application? You may have to create /tmp/fromKafka directory in hdfs. Thanks & Regards, Vikram
On Fri, May 5, 2017 at 2:30 PM, Sushil Apex <sushil.aa...@gmail.com> wrote: > > I am using Apex 3.5.0 and kafka 0.9 > malhar library used is 3.6.0 > > I am following the example from > https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/KafkaApp.java > > I am putting messages in Kafka topic using consolekafkaProducer provided by > Kafka, but I am not able to read these messages in Apex DAG(created based on > above link). > > I am running the apex apa file through apex-cli > > Apex CLI 3.5.0 06.12.2016 @ 22:11:51 PST rev: 6de8828 branch: > 6de8828e4f3d5734d0a6f9c1be0aa7057cb60ac8 > apex> launch --local /home/cloudera/myapexapp-1.0-SNAPSHOT.apa > 1. Kafka2HDFS > 2. MyFirstApplication > Choose application: 1 > > and nothing happens after this, I can see the messages put in consoleConsumer > in kafka logs > > Properties used are > > <property> > <name>dt.operator.kafkaIn.prop.topics</name> > <value>kafka2hdfs</value> > </property> > > <property> > <name>dt.operator.kafkaIn.prop.consumer.zookeeper</name> > <value>localhost:2181</value> > </property> > <property> > <name>dt.operator.kafkaIn.prop.clusters</name> > <value>localhost:9092</value> > </property> > <property> > <name>dt.operator.kafkaIn.prop.initialPartitionCount</name> > <value>1</value> > </property> > > > Application Code > > KafkaSinglePortByteArrayInputOperator in > = dag.addOperator("kafkaIn", new > KafkaSinglePortByteArrayInputOperator()); > > LineOutputOperator out = dag.addOperator("fileOut", new > LineOutputOperator()); > > dag.addStream("dataf", in.outputPort, out.input); > > > I am not able to understand what config I am missing here?