Hi Pramod,

We did some more research by adding more logging to the KafkaInput operator
and below are our findings.

Application Setup: 
1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager
2. Streaming window for application is set to 5 seconds from 0.5 seconds for
easily reproducing the issue
3. Created 2 custom classes by for Input and Output operator only for the
purpose of adding debugging logs

Logs for KafkaIn before operator failure :
--------------------------------------------
2018-03-20 19:36:49,494 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771822 : 48
2018-03-20 19:36:49,599 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
2018-03-20 19:36:54,496 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771823 : 48
2018-03-20 19:36:54,578 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

Logs of the KafKaIn after recovery :
------------------------------------
CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771822 : 48
2018-03-20 19:37:06,664 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
2018-03-20 19:37:06,665 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771823 : 48
2018-03-20 19:37:06,720 INFO  util.AsyncFSStorageAgent
(AsyncFSStorageAgent.java:save(91)) - using
/grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/application_1519410901484_172884/container_e3125_1519410901484_172884_01_000005/tmp/chkp4360474156134593331
as the basepath for checkpointing.
2018-03-20 19:37:06,727 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771824 : 0
2018-03-20 19:37:06,768 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771825 : 0
2018-03-20 19:37:06,810 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

Logs of the KafkaOutput operator :
-----------------------------------

2018-03-20 19:37:06,616 INFO 
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771822
2018-03-20 19:37:06,617 INFO 
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(203))
- Rebuild the partial window after 6535189514237771823
2018-03-20 19:37:07,943 INFO 
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(304))
- Partitial Window tuples : 
{id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147,
name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
2018-03-20 19:37:07,944 INFO 
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771823
2018-03-20 19:37:07,944 INFO 
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771824
2018-03-20 19:37:07,945 INFO 
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) -
Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM,
randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
2018-03-20 19:37:07,946 ERROR engine.StreamingContainer
(StreamingContainer.java:run(1456)) - Operator set
[OperatorDeployInfo[id=3,name=kafkaOutputOperator,type=GENERIC,checkpoint={5ab1a83d00000029,
0,
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=output,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
stopped running due to an exception.
java.lang.RuntimeException: Violates Exactly once. Not all the tuples
received after operator reset.
        at
com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOnceOutputOperator.java:117)
        at
com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
        at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
        at
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
2018-03-20 19:37:07,964 INFO  producer.KafkaProducer
(KafkaProducer.java:close(613)) - Closing the Kafka producer with
timeoutMillis = 9223372036854775807 ms.
2018-03-20 19:37:08,515 INFO  engine.StreamingContainer
(StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy request:
[3]


If you see the logs from KafKa In before and after, the last window that
operator processed is 6535189514237771823 and while processing
6535189514237771824 it got killed. You can also see that the first tuple
from window 6535189514237771824 is {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}.
When Operator recovers it replays the tuple correctly till
6535189514237771823 but then it send 0 tuples for 6535189514237771824 and
6535189514237771825 window ids and then send the complete accumulated tuples
in 6535189514237771826 with 1st tuple as
{"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

This as per my understanding is not an idempotent behavior since the tuple
assignment before failure changed after recovery. Please correct me if I am
wrong. This we believe is casuing the failure for output operator because we
see that it recovers correctly with partially processed window
6535189514237771824. (Please refer the logs). We also verfied it by adding
consumer on output topic

Could you please confirm if its an issue and needs fix? and suggest one if
possible?

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/

Reply via email to