Hi Raja,

   I think you are using 0.8 version of kafka operator. There is no such
operator in Malhar.  To meet your requirement, please do as below:

  Create a new class which extend from AbstractKafkaInputOperator. Override
the API "void emitTuples()" and create the output port of type
MutablePair<Message,MutablePair<long,int>>

Copy the emitTuples() from AbstractKafkaInputOperator and change the below
line:
emitTuple(message.msg) to
outputPort.emit(new MutablePair<>(message.getMsg(),new
MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));

Regards,
Chaitanya


On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <[email protected]
> wrote:

>
> Hi
>
> Does anyone have an idea, if any of the existing kafka input operators
> give the ability to retrieve  kafka Partition ID & Offset a particular
> message came from, along with the messages ?
>
>
> Thanks a lot in advance.
>
>
> Regards,
> Raja.
>

Reply via email to