This is a continuation of my previous question about
KafkaSinglePortExactlyOnceOutputOperator. I am trying a to achieve end to
end exactly once processing with data receiving from one Kafka topic and
finally posting it to another Kafka topic. Below are three things needed, as
Pramod mentioned in one of the presentations, to achieve this

1. Idempotent inuput operator
2. stateful operator recovery (Something that Apex provides out of the box)
3. Action by OutputOperator (similar to what AbstractFileOutputOperator
does)

I am using the KafkaSinglePortInputOperator from
org.apache.apex.malhar.kafka package which is not an idempotent by default.
I made it idempotent as below

public abstract class CustomAbstractKafkaInputOperator extends
AbstractKafkaInputOperator {

        @Override
        public void setup(OperatorContext context) {
                super.setWindowDataManager(new FSWindowDataManager());
                super.setup(context);
        }

}

public class CustomKafkaSinglePortInputOperator extends
CustomAbstractKafkaInputOperator {

        public final transient DefaultOutputPort<byte[]> outputPort = new
DefaultOutputPort<>();

        @Override
        public AbstractKafkaConsumer createConsumer(Properties properties) {
                return new KafkaConsumer09(properties);
        }

        @Override
        protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]>
message) {
                outputPort.emit(message.value());
        }
}

For output operator I am using KafkaSinglePortExactlyOnceOutputOperator from
org.apache.apex.malhar.kafka package. With reference to my previous
question, I made sure that both/all applicable mentioned conditions are
satisfied and even after that, I am receiving below error 

2018-03-15 12:28:49,485 INFO com.datatorrent.stram.StreamingContainerParent:
child msg: Stopped running due to an exception. java.lang.RuntimeException:
Violates Exactly once. Not all the tuples received after operator reset.
        at
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:190)
        at
com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
        at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
        at
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
 context:
PTContainer[id=4(container_1521139241132_0003_01_000013),state=ACTIVE,operators=[PTOperator[id=4,name=kafkaOutputOperator,state=PENDING_DEPLOY]]]

Questions I have are 

1. Is the way input operator made idempotent correct? or am i missing
anything
2. Do I need to make each and every operator in pipeline idempotent to
achieve this? As per my understanding, Not; because once the mapping between
tuple to window is established at first operator, it doen't change anywhere
furhter in pipeline

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/

Reply via email to