I was just trying to put together a small application to read from Kafka
using the new 0.9 input
operator from Apex Malhar but ran into a strange problem so I thought it
would be useful to share
it and the resolution with the list.

Here is my application code (*LineOutputOperator* is a trivial extension of
the *AbstractFileOutputOperator*):

  *public void populateDAG(DAG dag, Configuration conf)*
*  {*
*    KafkaSinglePortInputOperator in = dag.addOperator("in", new
KafkaSinglePortInputOperator());*
*    in.setInitialPartitionCount(1);*
*    in.setTopics("test");*
*
in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name
<http://AbstractKafkaInputOperator.InitialOffset.EARLIEST.name>());*
*    in.setClusters("localhost:2181");*

*    LineOutputOperator out = dag.addOperator("out", new
LineOutputOperator());*
*    out.setFilePath("/tmp/FromKafka");*
*    out.setFileName("test");*
*    out.setMaxLength(1024);        // max size of rolling output file*

*    // create stream connecting input adapter to output adapter*
*    dag.addStream("data", in.outputPort, out.input);*
*  }*

This was failing when deploying to the cluster where it was stuck in the
ACCEPTED state. There
were no errors in either the Resource Manager or the Application Master
logs.

Turned out the problem is that the *setClusters()* call needs a *broker
address*, not the zookeeper address. So simply changing the port from 2181
to 9092 fixed the problem (thanks to Thomas for
pointing this out). This was not obvious since the Kafka command line
producer script takes a
zookeeper address.

There is also a programmatic way to get the broker info from zookeeper as
described in:
http://stackoverflow.com/questions/29490113/kafka-get-broker-host-from-zookeeper

Hopefully, this will be useful to others who might run into the same
problem.

Ram

Reply via email to