Also, you may be required to specify the initial offset to EARLIEST.

Ajay

On Fri, May 5, 2017 at 2:35 PM, vikram patil <patilvik...@gmail.com> wrote:

> Hi Sushil,
>
> Have you provided configuration specifying hdfs directory and file for
> an application?
> You may have to create /tmp/fromKafka directory in hdfs.
> Thanks & Regards,
> Vikram
>
> On Fri, May 5, 2017 at 2:30 PM, Sushil Apex <sushil.aa...@gmail.com>
> wrote:
> >
> > I am using Apex 3.5.0 and kafka 0.9
> > malhar library used is 3.6.0
> >
> > I am following the example from https://github.com/
> DataTorrent/examples/blob/master/tutorials/kafka/src/
> main/java/com/example/myapexapp/KafkaApp.java
> >
> > I am putting messages in Kafka topic using consolekafkaProducer provided
> by Kafka, but I am not able to read these messages in Apex DAG(created
> based on above link).
> >
> > I am running the apex apa file through apex-cli
> >
> > Apex CLI 3.5.0 06.12.2016 @ 22:11:51 PST rev: 6de8828 branch:
> 6de8828e4f3d5734d0a6f9c1be0aa7057cb60ac8
> > apex> launch --local /home/cloudera/myapexapp-1.0-SNAPSHOT.apa
> >   1. Kafka2HDFS
> >   2. MyFirstApplication
> > Choose application: 1
> >
> > and nothing happens after this, I can see the messages put in
> consoleConsumer in kafka logs
> >
> > Properties used are
> >
> > <property>
> >   <name>dt.operator.kafkaIn.prop.topics</name>
> >   <value>kafka2hdfs</value>
> > </property>
> >
> > <property>
> >   <name>dt.operator.kafkaIn.prop.consumer.zookeeper</name>
> >   <value>localhost:2181</value>
> > </property>
> > <property>
> >   <name>dt.operator.kafkaIn.prop.clusters</name>
> >   <value>localhost:9092</value>
> > </property>
> > <property>
> >   <name>dt.operator.kafkaIn.prop.initialPartitionCount</name>
> >   <value>1</value>
> > </property>
> >
> >
> > Application Code
> >
> > KafkaSinglePortByteArrayInputOperator in
> >                 = dag.addOperator("kafkaIn", new
> KafkaSinglePortByteArrayInputOperator());
> >
> >         LineOutputOperator out = dag.addOperator("fileOut", new
> LineOutputOperator());
> >
> >         dag.addStream("dataf", in.outputPort, out.input);
> >
> >
> > I am not able to understand what config I am missing here?
>

Reply via email to