Hi Raja,

I have quickly wrote an operator to fulfill your requirement. The code is
available here
<https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>.
Let me know if this addresses your usecase.

-Priyanka

On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <[email protected]>
wrote:

> Hi Raja,
>
> You will need to update other places as well (I guess it's replay other
> than emitTuples) . But I think it is not feasible to replicate emitTuples
> code in subclass as many of the parent class variables are private. I would
> try to figure out if there is any other way.
>
> Can you please confirm which Kafka version you are using?
>
> -Priyanka
>
> On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <
> [email protected]> wrote:
>
>>
>> Hi Chaitanya,
>>
>> Would the below changes you proposed enough to retrieve partition &
>> offset ?
>>
>> I see *emitTuple(Message msg) i*s being called at various places in the
>> code… please advise. Thank you.
>>
>>
>> Regards,
>> Raja.
>>
>> From: "Raja.Aravapalli" <[email protected]>
>> Date: Tuesday, June 14, 2016 at 9:50 PM
>> To: "[email protected]" <[email protected]>
>> Subject: Re: Kafka input operator
>>
>>
>> Thanks for the response Chaitanya. I will follow the suggestions to
>> retrieve Kafka partitionId & offset!!
>>
>>
>> Regards,
>> Raja.
>>
>> From: Chaitanya Chebolu <[email protected]>
>> Reply-To: "[email protected]" <[email protected]>
>> Date: Monday, June 13, 2016 at 3:06 AM
>> To: "[email protected]" <[email protected]>
>> Subject: Re: Kafka input operator
>>
>> Hi Raja,
>>
>>    I think you are using 0.8 version of kafka operator. There is no such
>> operator in Malhar.  To meet your requirement, please do as below:
>>
>>   Create a new class which extend from AbstractKafkaInputOperator.
>> Override the API "void emitTuples()" and create the output port of type
>> MutablePair<Message,MutablePair<long,int>>
>>
>> Copy the emitTuples() from AbstractKafkaInputOperator and change the
>> below line:
>> emitTuple(message.msg) to
>> outputPort.emit(new MutablePair<>(message.getMsg(),new
>> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));
>>
>> Regards,
>> Chaitanya
>>
>>
>> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <
>> [email protected]> wrote:
>>
>>>
>>> Hi
>>>
>>> Does anyone have an idea, if any of the existing kafka input operators
>>> give the ability to retrieve  kafka Partition ID & Offset a particular
>>> message came from, along with the messages ?
>>>
>>>
>>> Thanks a lot in advance.
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>
>>
>

Reply via email to