Hi Pramod, We did some more research by adding more logging to the KafkaInput operator and below are our findings.
Application Setup: 1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager 2. Streaming window for application is set to 5 seconds from 0.5 seconds for easily reproducing the issue 3. Created 2 custom classes by for Input and Output operator only for the purpose of adding debugging logs Logs for KafkaIn before operator failure : -------------------------------------------- 2018-03-20 19:36:49,494 INFO kafkaoutputdedup.CustomKafkaSinglePortInputOperator (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples processed in window 6535189514237771822 : 48 2018-03-20 19:36:49,599 INFO kafkaoutputdedup.CustomKafkaSinglePortInputOperator (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"} 2018-03-20 19:36:54,496 INFO kafkaoutputdedup.CustomKafkaSinglePortInputOperator (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples processed in window 6535189514237771823 : 48 2018-03-20 19:36:54,578 INFO kafkaoutputdedup.CustomKafkaSinglePortInputOperator (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"} Logs of the KafKaIn after recovery : ------------------------------------ CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples processed in window 6535189514237771822 : 48 2018-03-20 19:37:06,664 INFO kafkaoutputdedup.CustomKafkaSinglePortInputOperator (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"} 2018-03-20 19:37:06,665 INFO kafkaoutputdedup.CustomKafkaSinglePortInputOperator (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples processed in window 6535189514237771823 : 48 2018-03-20 19:37:06,720 INFO util.AsyncFSStorageAgent (AsyncFSStorageAgent.java:save(91)) - using /grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/application_1519410901484_172884/container_e3125_1519410901484_172884_01_000005/tmp/chkp4360474156134593331 as the basepath for checkpointing. 2018-03-20 19:37:06,727 INFO kafkaoutputdedup.CustomKafkaSinglePortInputOperator (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples processed in window 6535189514237771824 : 0 2018-03-20 19:37:06,768 INFO kafkaoutputdedup.CustomKafkaSinglePortInputOperator (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples processed in window 6535189514237771825 : 0 2018-03-20 19:37:06,810 INFO kafkaoutputdedup.CustomKafkaSinglePortInputOperator (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"} Logs of the KafkaOutput operator : ----------------------------------- 2018-03-20 19:37:06,616 INFO kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) - Current Window : 6535189514237771822 2018-03-20 19:37:06,617 INFO kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator (CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(203)) - Rebuild the partial window after 6535189514237771823 2018-03-20 19:37:07,943 INFO kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator (CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(304)) - Partitial Window tuples : {id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148, name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149, name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146, name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1} 2018-03-20 19:37:07,944 INFO kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) - Current Window : 6535189514237771823 2018-03-20 19:37:07,944 INFO kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) - Current Window : 6535189514237771824 2018-03-20 19:37:07,945 INFO kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) - Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148, name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149, name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146, name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1} 2018-03-20 19:37:07,946 ERROR engine.StreamingContainer (StreamingContainer.java:run(1456)) - Operator set [OperatorDeployInfo[id=3,name=kafkaOutputOperator,type=GENERIC,checkpoint={5ab1a83d00000029, 0, 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=output,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]] stopped running due to an exception. java.lang.RuntimeException: Violates Exactly once. Not all the tuples received after operator reset. at com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOnceOutputOperator.java:117) at com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153) at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397) at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428) 2018-03-20 19:37:07,964 INFO producer.KafkaProducer (KafkaProducer.java:close(613)) - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 2018-03-20 19:37:08,515 INFO engine.StreamingContainer (StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy request: [3] If you see the logs from KafKa In before and after, the last window that operator processed is 6535189514237771823 and while processing 6535189514237771824 it got killed. You can also see that the first tuple from window 6535189514237771824 is {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}. When Operator recovers it replays the tuple correctly till 6535189514237771823 but then it send 0 tuples for 6535189514237771824 and 6535189514237771825 window ids and then send the complete accumulated tuples in 6535189514237771826 with 1st tuple as {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"} This as per my understanding is not an idempotent behavior since the tuple assignment before failure changed after recovery. Please correct me if I am wrong. This we believe is casuing the failure for output operator because we see that it recovers correctly with partially processed window 6535189514237771824. (Please refer the logs). We also verfied it by adding consumer on output topic Could you please confirm if its an issue and needs fix? and suggest one if possible? Regards Vivek -- Sent from: http://apache-apex-users-list.78494.x6.nabble.com/