This is a continuation of my previous question about KafkaSinglePortExactlyOnceOutputOperator. I am trying a to achieve end to end exactly once processing with data receiving from one Kafka topic and finally posting it to another Kafka topic. Below are three things needed, as Pramod mentioned in one of the presentations, to achieve this
1. Idempotent inuput operator 2. stateful operator recovery (Something that Apex provides out of the box) 3. Action by OutputOperator (similar to what AbstractFileOutputOperator does) I am using the KafkaSinglePortInputOperator from org.apache.apex.malhar.kafka package which is not an idempotent by default. I made it idempotent as below public abstract class CustomAbstractKafkaInputOperator extends AbstractKafkaInputOperator { @Override public void setup(OperatorContext context) { super.setWindowDataManager(new FSWindowDataManager()); super.setup(context); } } public class CustomKafkaSinglePortInputOperator extends CustomAbstractKafkaInputOperator { public final transient DefaultOutputPort<byte[]> outputPort = new DefaultOutputPort<>(); @Override public AbstractKafkaConsumer createConsumer(Properties properties) { return new KafkaConsumer09(properties); } @Override protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message) { outputPort.emit(message.value()); } } For output operator I am using KafkaSinglePortExactlyOnceOutputOperator from org.apache.apex.malhar.kafka package. With reference to my previous question, I made sure that both/all applicable mentioned conditions are satisfied and even after that, I am receiving below error 2018-03-15 12:28:49,485 INFO com.datatorrent.stram.StreamingContainerParent: child msg: Stopped running due to an exception. java.lang.RuntimeException: Violates Exactly once. Not all the tuples received after operator reset. at org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:190) at com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153) at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397) at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428) context: PTContainer[id=4(container_1521139241132_0003_01_000013),state=ACTIVE,operators=[PTOperator[id=4,name=kafkaOutputOperator,state=PENDING_DEPLOY]]] Questions I have are 1. Is the way input operator made idempotent correct? or am i missing anything 2. Do I need to make each and every operator in pipeline idempotent to achieve this? As per my understanding, Not; because once the mapping between tuple to window is established at first operator, it doen't change anywhere furhter in pipeline Regards Vivek -- Sent from: http://apache-apex-users-list.78494.x6.nabble.com/