Hi,

Yes sure, you can use any package name you want. In fact better you put
this class outside Malhar jar. Just keep the Malhar jar in your class path.

-Priyanka
On Jun 17, 2016 8:03 PM, "Raja.Aravapalli" <[email protected]>
wrote:

>
> Hi Priyanka,
>
> Can this be done from a class outside the package “
> com.datatorrent.contrib.kafka;” ?
>
> I don’t want to disturb the source :(
>
>
>
> Regards,
> Raja.
>
> From: "Raja.Aravapalli" <[email protected]>
> Date: Friday, June 17, 2016 at 5:38 AM
> To: "[email protected]" <[email protected]>
> Subject: Re: Kafka input operator
>
>
> Hi Priyanka,
>
> I am using kafka version 0.8.x.
>
> Awesome. Yes. This is what is want. I shall test this and share my
> updates. Having one kafka operator like this in Malhar, will be a very good
> one. I don’t see such availability in Storm as well!!
>
>
>
> Regards,
> Raja.
>
> From: Priyanka Gugale <[email protected]>
> Reply-To: "[email protected]" <[email protected]>
> Date: Friday, June 17, 2016 at 2:05 AM
> To: "[email protected]" <[email protected]>
> Subject: Re: Kafka input operator
>
> Hi Raja,
>
> I have quickly wrote an operator to fulfill your requirement. The code is
> available here
> <https://github.com/apache/apex-malhar/compare/master...DT-Priyanka:Kafka-input-updates>.
> Let me know if this addresses your usecase.
>
> -Priyanka
>
> On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale <
> [email protected]> wrote:
>
>> Hi Raja,
>>
>> You will need to update other places as well (I guess it's replay other
>> than emitTuples) . But I think it is not feasible to replicate emitTuples
>> code in subclass as many of the parent class variables are private. I would
>> try to figure out if there is any other way.
>>
>> Can you please confirm which Kafka version you are using?
>>
>> -Priyanka
>>
>> On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli <
>> [email protected]> wrote:
>>
>>>
>>> Hi Chaitanya,
>>>
>>> Would the below changes you proposed enough to retrieve partition &
>>> offset ?
>>>
>>> I see *emitTuple(Message msg) i*s being called at various places in the
>>> code… please advise. Thank you.
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: "Raja.Aravapalli" <[email protected]>
>>> Date: Tuesday, June 14, 2016 at 9:50 PM
>>> To: "[email protected]" <[email protected]>
>>> Subject: Re: Kafka input operator
>>>
>>>
>>> Thanks for the response Chaitanya. I will follow the suggestions to
>>> retrieve Kafka partitionId & offset!!
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: Chaitanya Chebolu <[email protected]>
>>> Reply-To: "[email protected]" <[email protected]>
>>> Date: Monday, June 13, 2016 at 3:06 AM
>>> To: "[email protected]" <[email protected]>
>>> Subject: Re: Kafka input operator
>>>
>>> Hi Raja,
>>>
>>>    I think you are using 0.8 version of kafka operator. There is no such
>>> operator in Malhar.  To meet your requirement, please do as below:
>>>
>>>   Create a new class which extend from AbstractKafkaInputOperator.
>>> Override the API "void emitTuples()" and create the output port of type
>>> MutablePair<Message,MutablePair<long,int>>
>>>
>>> Copy the emitTuples() from AbstractKafkaInputOperator and change the
>>> below line:
>>> emitTuple(message.msg) to
>>> outputPort.emit(new MutablePair<>(message.getMsg(),new
>>> MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId())));
>>>
>>> Regards,
>>> Chaitanya
>>>
>>>
>>> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravapalli <
>>> [email protected]> wrote:
>>>
>>>>
>>>> Hi
>>>>
>>>> Does anyone have an idea, if any of the existing kafka input operators
>>>> give the ability to retrieve  kafka Partition ID & Offset a particular
>>>> message came from, along with the messages ?
>>>>
>>>>
>>>> Thanks a lot in advance.
>>>>
>>>>
>>>> Regards,
>>>> Raja.
>>>>
>>>
>>>
>>
>

Reply via email to