Kafka exactly once output operator is assuming that partial window data will come in the same window after recovery. This is a bug which needs to be fixed. This doesn't affect Kafka 0.11, as a different mechanism is used to achieve exactly once.
On Wed, Mar 21, 2018 at 11:04 AM Pramod Immaneni <pra...@datatorrent.com> wrote: > Any window that was not complete by the time the operator died is not > replayed by definition (as we don't have all the data in the window) and > the output operators should also not expect that. In your case if the > operator died during window ..24 then on restart you can expect that the > input operator with the data manager will replay all windows from > checkpoint till and including the window prior to failure, in an idempotent > fashion, but not the window during which failure happened. Also in > idempotent replay, the window is treated as the replay unit, so the exact > data within windows is replayed but order is not guaranteed generally > because of partitioning the data can arrive in different order than the > previous run at the output operators. Typically the output operators in the > library that do exactly once do understand and work with these definitions, > so not sure exactly why the kafka output operator is reporting exactly once > violation for an incomplete window. Maybe somebody who is well versed with > the output operator code can comment? > > Thanks > > On Tue, Mar 20, 2018 at 6:16 PM, Vivek Bhide <vivek.bh...@target.com> > wrote: > >> Hi Pramod, >> >> We did some more research by adding more logging to the KafkaInput >> operator >> and below are our findings. >> >> Application Setup: >> 1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager >> 2. Streaming window for application is set to 5 seconds from 0.5 seconds >> for >> easily reproducing the issue >> 3. Created 2 custom classes by for Input and Output operator only for the >> purpose of adding debugging logs >> >> Logs for KafkaIn before operator failure : >> -------------------------------------------- >> 2018-03-20 19:36:49,494 INFO >> kafkaoutputdedup.CustomKafkaSinglePortInputOperator >> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples >> processed in window 6535189514237771822 : 48 >> 2018-03-20 19:36:49,599 INFO >> kafkaoutputdedup.CustomKafkaSinglePortInputOperator >> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in >> window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"} >> 2018-03-20 19:36:54,496 INFO >> kafkaoutputdedup.CustomKafkaSinglePortInputOperator >> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples >> processed in window 6535189514237771823 : 48 >> 2018-03-20 19:36:54,578 INFO >> kafkaoutputdedup.CustomKafkaSinglePortInputOperator >> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in >> window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"} >> >> Logs of the KafKaIn after recovery : >> ------------------------------------ >> CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples >> processed in window 6535189514237771822 : 48 >> 2018-03-20 19:37:06,664 INFO >> kafkaoutputdedup.CustomKafkaSinglePortInputOperator >> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in >> window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"} >> 2018-03-20 19:37:06,665 INFO >> kafkaoutputdedup.CustomKafkaSinglePortInputOperator >> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples >> processed in window 6535189514237771823 : 48 >> 2018-03-20 19:37:06,720 INFO util.AsyncFSStorageAgent >> (AsyncFSStorageAgent.java:save(91)) - using >> >> /grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/application_1519410901484_172884/container_e3125_1519410901484_172884_01_000005/tmp/chkp4360474156134593331 >> as the basepath for checkpointing. >> 2018-03-20 19:37:06,727 INFO >> kafkaoutputdedup.CustomKafkaSinglePortInputOperator >> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples >> processed in window 6535189514237771824 : 0 >> 2018-03-20 19:37:06,768 INFO >> kafkaoutputdedup.CustomKafkaSinglePortInputOperator >> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples >> processed in window 6535189514237771825 : 0 >> 2018-03-20 19:37:06,810 INFO >> kafkaoutputdedup.CustomKafkaSinglePortInputOperator >> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in >> window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"} >> >> Logs of the KafkaOutput operator : >> ----------------------------------- >> >> 2018-03-20 19:37:06,616 INFO >> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator >> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) - >> Current Window : 6535189514237771822 >> 2018-03-20 19:37:06,617 INFO >> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator >> >> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(203)) >> - Rebuild the partial window after 6535189514237771823 >> 2018-03-20 19:37:07,943 INFO >> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator >> >> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(304)) >> - Partitial Window tuples : >> {id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147, >> name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148, >> name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149, >> name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146, >> name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1} >> 2018-03-20 19:37:07,944 INFO >> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator >> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) - >> Current Window : 6535189514237771823 >> 2018-03-20 19:37:07,944 INFO >> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator >> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) - >> Current Window : 6535189514237771824 >> 2018-03-20 19:37:07,945 INFO >> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator >> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) - >> Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM, >> randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148, >> name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149, >> name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146, >> name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1} >> 2018-03-20 19:37:07,946 ERROR engine.StreamingContainer >> (StreamingContainer.java:run(1456)) - Operator set >> >> [OperatorDeployInfo[id=3,name=kafkaOutputOperator,type=GENERIC,checkpoint={5ab1a83d00000029, >> 0, >> >> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=output,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]] >> stopped running due to an exception. >> java.lang.RuntimeException: Violates Exactly once. Not all the tuples >> received after operator reset. >> at >> >> com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOnceOutputOperator.java:117) >> at >> >> com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153) >> at >> com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397) >> at >> >> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428) >> 2018-03-20 19:37:07,964 INFO producer.KafkaProducer >> (KafkaProducer.java:close(613)) - Closing the Kafka producer with >> timeoutMillis = 9223372036854775807 ms. >> 2018-03-20 19:37:08,515 INFO engine.StreamingContainer >> (StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy >> request: >> [3] >> >> >> If you see the logs from KafKa In before and after, the last window that >> operator processed is 6535189514237771823 and while processing >> 6535189514237771824 it got killed. You can also see that the first tuple >> from window 6535189514237771824 is >> {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}. >> When Operator recovers it replays the tuple correctly till >> 6535189514237771823 but then it send 0 tuples for 6535189514237771824 and >> 6535189514237771825 window ids and then send the complete accumulated >> tuples >> in 6535189514237771826 with 1st tuple as >> {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"} >> >> This as per my understanding is not an idempotent behavior since the tuple >> assignment before failure changed after recovery. Please correct me if I >> am >> wrong. This we believe is casuing the failure for output operator because >> we >> see that it recovers correctly with partially processed window >> 6535189514237771824. (Please refer the logs). We also verfied it by adding >> consumer on output topic >> >> Could you please confirm if its an issue and needs fix? and suggest one if >> possible? >> >> Regards >> Vivek >> >> >> >> -- >> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/ >> > >