Ram,

Kafka is moving away from client side use of ZooKeeper. The change in the
new input operator reflects that. The client will locate the partitions
through initial broker(s) now.

Thomas




On Sat, Apr 16, 2016 at 9:36 PM, Munagala Ramanath <[email protected]>
wrote:

> I was just trying to put together a small application to read from Kafka
> using the new 0.9 input
> operator from Apex Malhar but ran into a strange problem so I thought it
> would be useful to share
> it and the resolution with the list.
>
> Here is my application code (*LineOutputOperator* is a trivial extension of
> the *AbstractFileOutputOperator*):
>
>   *public void populateDAG(DAG dag, Configuration conf)*
> *  {*
> *    KafkaSinglePortInputOperator in = dag.addOperator("in", new
> KafkaSinglePortInputOperator());*
> *    in.setInitialPartitionCount(1);*
> *    in.setTopics("test");*
> *
> in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name
> <http://AbstractKafkaInputOperator.InitialOffset.EARLIEST.name>());*
> *    in.setClusters("localhost:2181");*
>
> *    LineOutputOperator out = dag.addOperator("out", new
> LineOutputOperator());*
> *    out.setFilePath("/tmp/FromKafka");*
> *    out.setFileName("test");*
> *    out.setMaxLength(1024);        // max size of rolling output file*
>
> *    // create stream connecting input adapter to output adapter*
> *    dag.addStream("data", in.outputPort, out.input);*
> *  }*
>
> This was failing when deploying to the cluster where it was stuck in the
> ACCEPTED state. There
> were no errors in either the Resource Manager or the Application Master
> logs.
>
> Turned out the problem is that the *setClusters()* call needs a *broker
> address*, not the zookeeper address. So simply changing the port from 2181
> to 9092 fixed the problem (thanks to Thomas for
> pointing this out). This was not obvious since the Kafka command line
> producer script takes a
> zookeeper address.
>
> There is also a programmatic way to get the broker info from zookeeper as
> described in:
>
> http://stackoverflow.com/questions/29490113/kafka-get-broker-host-from-zookeeper
>
> Hopefully, this will be useful to others who might run into the same
> problem.
>
> Ram
>

Reply via email to