Kafka exactly once output operator is assuming that partial window data
will come in the same window after recovery. This is a bug which needs to
be fixed. This doesn't affect Kafka 0.11, as a different mechanism is used
to achieve exactly once.

On Wed, Mar 21, 2018 at 11:04 AM Pramod Immaneni <pra...@datatorrent.com>
wrote:

> Any window that was not complete by the time the operator died is not
> replayed by definition (as we don't have all the data in the window) and
> the output operators should also not expect that. In your case if the
> operator died during window ..24 then on restart you can expect that the
> input operator with the data manager will replay all windows from
> checkpoint till and including the window prior to failure, in an idempotent
> fashion, but not the window during which failure happened. Also in
> idempotent replay, the window is treated as the replay unit, so the exact
> data within windows is replayed but order is not guaranteed generally
> because of partitioning the data can arrive in different order than the
> previous run at the output operators. Typically the output operators in the
> library that do exactly once do understand and work with these definitions,
> so not sure exactly why the kafka output operator is reporting exactly once
> violation for an incomplete window. Maybe somebody who is well versed with
> the output operator code can comment?
>
> Thanks
>
> On Tue, Mar 20, 2018 at 6:16 PM, Vivek Bhide <vivek.bh...@target.com>
> wrote:
>
>> Hi Pramod,
>>
>> We did some more research by adding more logging to the KafkaInput
>> operator
>> and below are our findings.
>>
>> Application Setup:
>> 1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager
>> 2. Streaming window for application is set to 5 seconds from 0.5 seconds
>> for
>> easily reproducing the issue
>> 3. Created 2 custom classes by for Input and Output operator only for the
>> purpose of adding debugging logs
>>
>> Logs for KafkaIn before operator failure :
>> --------------------------------------------
>> 2018-03-20 19:36:49,494 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771822 : 48
>> 2018-03-20 19:36:49,599 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
>> window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
>> 2018-03-20 19:36:54,496 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771823 : 48
>> 2018-03-20 19:36:54,578 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
>> window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>>
>> Logs of the KafKaIn after recovery :
>> ------------------------------------
>> CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771822 : 48
>> 2018-03-20 19:37:06,664 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
>> window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
>> 2018-03-20 19:37:06,665 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771823 : 48
>> 2018-03-20 19:37:06,720 INFO  util.AsyncFSStorageAgent
>> (AsyncFSStorageAgent.java:save(91)) - using
>>
>> /grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/application_1519410901484_172884/container_e3125_1519410901484_172884_01_000005/tmp/chkp4360474156134593331
>> as the basepath for checkpointing.
>> 2018-03-20 19:37:06,727 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771824 : 0
>> 2018-03-20 19:37:06,768 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771825 : 0
>> 2018-03-20 19:37:06,810 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
>> window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>>
>> Logs of the KafkaOutput operator :
>> -----------------------------------
>>
>> 2018-03-20 19:37:06,616 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
>> Current Window : 6535189514237771822
>> 2018-03-20 19:37:06,617 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>>
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(203))
>> - Rebuild the partial window after 6535189514237771823
>> 2018-03-20 19:37:07,943 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>>
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(304))
>> - Partitial Window tuples :
>> {id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147,
>> name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
>> name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
>> name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
>> name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
>> 2018-03-20 19:37:07,944 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
>> Current Window : 6535189514237771823
>> 2018-03-20 19:37:07,944 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
>> Current Window : 6535189514237771824
>> 2018-03-20 19:37:07,945 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) -
>> Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM,
>> randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
>> name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
>> name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
>> name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
>> 2018-03-20 19:37:07,946 ERROR engine.StreamingContainer
>> (StreamingContainer.java:run(1456)) - Operator set
>>
>> [OperatorDeployInfo[id=3,name=kafkaOutputOperator,type=GENERIC,checkpoint={5ab1a83d00000029,
>> 0,
>>
>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=output,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
>> stopped running due to an exception.
>> java.lang.RuntimeException: Violates Exactly once. Not all the tuples
>> received after operator reset.
>>         at
>>
>> com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOnceOutputOperator.java:117)
>>         at
>>
>> com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
>>         at
>> com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
>>         at
>>
>> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
>> 2018-03-20 19:37:07,964 INFO  producer.KafkaProducer
>> (KafkaProducer.java:close(613)) - Closing the Kafka producer with
>> timeoutMillis = 9223372036854775807 ms.
>> 2018-03-20 19:37:08,515 INFO  engine.StreamingContainer
>> (StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy
>> request:
>> [3]
>>
>>
>> If you see the logs from KafKa In before and after, the last window that
>> operator processed is 6535189514237771823 and while processing
>> 6535189514237771824 it got killed. You can also see that the first tuple
>> from window 6535189514237771824 is
>> {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}.
>> When Operator recovers it replays the tuple correctly till
>> 6535189514237771823 but then it send 0 tuples for 6535189514237771824 and
>> 6535189514237771825 window ids and then send the complete accumulated
>> tuples
>> in 6535189514237771826 with 1st tuple as
>> {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>>
>> This as per my understanding is not an idempotent behavior since the tuple
>> assignment before failure changed after recovery. Please correct me if I
>> am
>> wrong. This we believe is casuing the failure for output operator because
>> we
>> see that it recovers correctly with partially processed window
>> 6535189514237771824. (Please refer the logs). We also verfied it by adding
>> consumer on output topic
>>
>> Could you please confirm if its an issue and needs fix? and suggest one if
>> possible?
>>
>> Regards
>> Vivek
>>
>>
>>
>> --
>> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>>
>
>

Reply via email to