Re: Datatorrent fault tolerance

2016-10-07 Thread hsy541
The KafkaSinglePortExactlyOnceOutputOperator takes whatever output from 
previous operator and writes to Kafka. 

Sent from my iPhone

> On Oct 7, 2016, at 07:59, Jaspal Singh  wrote:
> 
> Hi Thomas,
> 
> I have a question, so when we are using 
> KafkaSinglePortExactlyOnceOutputOperator to write results into maprstream 
> topic will it be able to read messgaes from the previous operator ?
> 
> 
> Thanks
> Jaspal
> 
>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise  wrote:
>> For recovery you need to set the window data manager like so:
>> 
>> https://github.com/DataTorrent/examples/blob/master/tutorials/exactly-once/src/main/java/com/example/myapexapp/Application.java#L33
>> 
>> That will also apply to stateful restart of the entire application (relaunch 
>> from previous instance's checkpointed state).
>> 
>> For cold restart, you would need to consider the property you mention and 
>> decide what is applicable to your use case.
>> 
>> Thomas
>> 
>> 
>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh  
>>> wrote:
>>> Ok now I get it. Thanks for the nice explaination !!
>>> 
>>> One more thing, so you mentioned about checkpointing the offset ranges to 
>>> replay in same order from kafka. 
>>> 
>>> Is there any property we need to configure to do that? like initialOffset 
>>> set to APPLICATION_OR_LATEST.
>>> 
>>> 
>>> Thanks
>>> Jaspal
>>> 
>>> 
 On Thursday, October 6, 2016, Thomas Weise  wrote:
 What you want is the effect of exactly-once output (that's why we call it 
 also end-to-end exactly-once). There is no such thing as exactly-once 
 processing in a distributed system. In this case it would be rather 
 "produce exactly-once. Upstream operators, on failure, will recover to 
 checkpointed state and re-process the stream from there. This is 
 at-least-once, the default behavior. Because in the input operator you 
 have configured to replay in the same order from Kafka (this is done by 
 checkpointing the offset ranges), the computation in the DAG is idempotent 
 and the output operator can discard the results that were already 
 published instead of producing duplicates. 
 
> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh  
> wrote:
> I think this is something called a customized operator implementation 
> that is taking care of exactly once processing at output.
> 
> What if any previous operators fail ? How we can make sure they also 
> recover using EXACTLY_ONCE processing mode ?
> 
> 
> Thanks
> Jaspal
> 
> 
>> On Thursday, October 6, 2016, Thomas Weise  
>> wrote:
>> In that case please have a look at:
>> 
>> https://github.com/apache/apex-malhar/blob/master/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>> 
>> The operator will ensure that messages are not duplicated, under the 
>> stated assumptions.
>> 
>> 
>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh 
>>>  wrote:
>>> Hi Thomas,
>>> 
>>> In our case we are writing the results back to maprstreams topic based 
>>> on some validations.
>>> 
>>> 
>>> Thanks
>>> Jaspal
>>> 
>>> 
 On Thursday, October 6, 2016, Thomas Weise  wrote:
 Hi,
 
 which operators in your application are writing to external systems?
 
 When you look at the example from the blog 
 (https://github.com/DataTorrent/examples/tree/master/tutorials/exactly-once),
  there is Kafka input, which is configured to be idempotent. The 
 results are written to JDBC. That operator by itself supports 
 exactly-once through transactions (in conjunction with idempotent 
 input), hence there is no need to configure the processing mode at all.
 
 Thomas
 
 
>> 
> 


Re: Kafka input operator

2016-06-19 Thread hsy541
The Pairs in Apache common are not Kryo serializable. You can use other pair 
data structure. For example KeyValuePair in Malhar library 

Siyuan

Sent from my iPhone

> On Jun 19, 2016, at 14:58, Raja.Aravapalli  wrote:
> 
> 
> Hi Priyanka, 
> 
> I am writing to read the messages in the next operator with input port 
> defined like the below, 
> 
> public transient DefaultInputPort Integer>>> input = new DefaultInputPort MutablePair>>()
> 
> Application is failing with below exception:
> 
> 2016-06-19 16:54:45,498 ERROR codec.DefaultStatefulStreamCodec 
> (DefaultStatefulStreamCodec.java:fromDataStatePair(98)) - Catastrophic Error: 
> Execution halted due to Kryo exception!
> com.esotericsoftware.kryo.KryoException: Class cannot be created (missing 
> no-arg constructor): kafka.message.Message
> Serialization trace:
> left (org.apache.commons.lang3.tuple.MutablePair)
>   at 
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>   at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>   at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
> 
> 
> Any help please. 
> 
> Regards, 
> Raja.
> 
> From: "Raja.Aravapalli" 
> Reply-To: "users@apex.apache.org" 
> Date: Sunday, June 19, 2016 at 12:22 AM
> To: "users@apex.apache.org" 
> Subject: Re: Kafka input operator
> 
> 
> Thanks for the response Priyanka…
> 
> But, when I try to put in my own package, some of the protected variables are 
> not accessible 
> 
> 
> Regards,
> Raja.
> 
> From: Priyanka Gugale 
> Reply-To: "users@apex.apache.org" 
> Date: Saturday, June 18, 2016 at 10:29 AM
> To: "users@apex.apache.org" 
> Subject: Re: Kafka input operator
> 
> Hi,
> 
> Yes sure, you can use any package name you want. In fact better you put this 
> class outside Malhar jar. Just keep the Malhar jar in your class path.
> 
> -Priyanka
> 
>> On Jun 17, 2016 8:03 PM, "Raja.Aravapalli"  
>> wrote:
>> 
>> Hi Priyanka, 
>> 
>> Can this be done from a class outside the package 
>> “com.datatorrent.contrib.kafka;” ?
>> 
>> I don’t want to disturb the source :(
>> 
>> 
>> 
>> Regards, 
>> Raja.
>> 
>> From: "Raja.Aravapalli" 
>> Date: Friday, June 17, 2016 at 5:38 AM
>> To: "users@apex.apache.org" 
>> Subject: Re: Kafka input operator
>> 
>> 
>> Hi Priyanka, 
>> 
>> I am using kafka version 0.8.x.
>> 
>> Awesome. Yes. This is what is want. I shall test this and share my updates. 
>> Having one kafka operator like this in Malhar, will be a very good one. I 
>> don’t see such availability in Storm as well!!
>> 
>> 
>> 
>> Regards, 
>> Raja.
>> 
>> From: Priyanka Gugale 
>> Reply-To: "users@apex.apache.org" 
>> Date: Friday, June 17, 2016 at 2:05 AM
>> To: "users@apex.apache.org" 
>> Subject: Re: Kafka input operator
>> 
>> Hi Raja,
>> 
>> I have quickly wrote an operator to fulfill your requirement. The code is 
>> available here. Let me know if this addresses your usecase.
>> 
>> -Priyanka
>> 
>>> On Fri, Jun 17, 2016 at 11:32 AM, Priyanka Gugale 
>>>  wrote:
>>> Hi Raja,
>>> 
>>> You will need to update other places as well (I guess it's replay other 
>>> than emitTuples) . But I think it is not feasible to replicate emitTuples 
>>> code in subclass as many of the parent class variables are private. I would 
>>> try to figure out if there is any other way.
>>> 
>>> Can you please confirm which Kafka version you are using?
>>> 
>>> -Priyanka
>>> 
 On Thu, Jun 16, 2016 at 8:39 PM, Raja.Aravapalli 
  wrote:
 
 Hi Chaitanya, 
 
 Would the below changes you proposed enough to retrieve partition & offset 
 ?
 
 I see emitTuple(Message msg) is being called at various places in the 
 code… please advise. Thank you.
 
 
 Regards,
 Raja.
 
 From: "Raja.Aravapalli" 
 Date: Tuesday, June 14, 2016 at 9:50 PM
 To: "users@apex.apache.org" 
 Subject: Re: Kafka input operator
 
 
 Thanks for the response Chaitanya. I will follow the suggestions to 
 retrieve Kafka partitionId & offset!! 
 
 
 Regards,
 Raja.
 
 From: Chaitanya Chebolu 
 Reply-To: "users@apex.apache.org" 
 Date: Monday, June 13, 2016 at 3:06 AM
 To: "users@apex.apache.org" 
 Subject: Re: Kafka input operator
 
 Hi Raja,
 
I think you are using 0.8 version of kafka operator. There is no such 
 operator in Malhar.  To meet your requirement, please do as below:
 
   Create a new class which extend from AbstractKafkaInputOperator. 
 Override the API "void emitTuples()" and create the output port of type 
 MutablePair>
 
 Copy the emitTuples() from AbstractKafkaInputOperator and change the below 
 line:
 emitTuple(message.msg) to
 outputPort.emit(new MutablePair<>(message.getMsg(),new 
 MutablePair<>(message.getOffset(),message.getKafkaPart().getPartitionId(;
 
 Regards,
 Chaitanya
 
 
> On Sat, Jun 11, 2016 at 7:56 PM, Raja.Aravap