Hi Sushil,

Have you provided configuration specifying hdfs directory and file for
an application?
You may have to create /tmp/fromKafka directory in hdfs.
Thanks & Regards,
Vikram

On Fri, May 5, 2017 at 2:30 PM, Sushil Apex <sushil.aa...@gmail.com> wrote:
>
> I am using Apex 3.5.0 and kafka 0.9
> malhar library used is 3.6.0
>
> I am following the example from 
> https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/KafkaApp.java
>
> I am putting messages in Kafka topic using consolekafkaProducer provided by 
> Kafka, but I am not able to read these messages in Apex DAG(created based on 
> above link).
>
> I am running the apex apa file through apex-cli
>
> Apex CLI 3.5.0 06.12.2016 @ 22:11:51 PST rev: 6de8828 branch: 
> 6de8828e4f3d5734d0a6f9c1be0aa7057cb60ac8
> apex> launch --local /home/cloudera/myapexapp-1.0-SNAPSHOT.apa
>   1. Kafka2HDFS
>   2. MyFirstApplication
> Choose application: 1
>
> and nothing happens after this, I can see the messages put in consoleConsumer 
> in kafka logs
>
> Properties used are
>
> <property>
>   <name>dt.operator.kafkaIn.prop.topics</name>
>   <value>kafka2hdfs</value>
> </property>
>
> <property>
>   <name>dt.operator.kafkaIn.prop.consumer.zookeeper</name>
>   <value>localhost:2181</value>
> </property>
> <property>
>   <name>dt.operator.kafkaIn.prop.clusters</name>
>   <value>localhost:9092</value>
> </property>
> <property>
>   <name>dt.operator.kafkaIn.prop.initialPartitionCount</name>
>   <value>1</value>
> </property>
>
>
> Application Code
>
> KafkaSinglePortByteArrayInputOperator in
>                 = dag.addOperator("kafkaIn", new 
> KafkaSinglePortByteArrayInputOperator());
>
>         LineOutputOperator out = dag.addOperator("fileOut", new 
> LineOutputOperator());
>
>         dag.addStream("dataf", in.outputPort, out.input);
>
>
> I am not able to understand what config I am missing here?

Reply via email to