Have you tried instantiating FSWindowDataManger and setting it on the operator instance in populateDAG. There may be state associated with the window manager which will be lost if you set a new instance each time in setup. You should be able to use KakfaSinglePortInputOperator directly.
On Thu, Mar 15, 2018 at 2:49 PM, Vivek Bhide <vivek.bh...@target.com> wrote: > This is a continuation of my previous question about > KafkaSinglePortExactlyOnceOutputOperator. I am trying a to achieve end to > end exactly once processing with data receiving from one Kafka topic and > finally posting it to another Kafka topic. Below are three things needed, > as > Pramod mentioned in one of the presentations, to achieve this > > 1. Idempotent inuput operator > 2. stateful operator recovery (Something that Apex provides out of the box) > 3. Action by OutputOperator (similar to what AbstractFileOutputOperator > does) > > I am using the KafkaSinglePortInputOperator from > org.apache.apex.malhar.kafka package which is not an idempotent by default. > I made it idempotent as below > > public abstract class CustomAbstractKafkaInputOperator extends > AbstractKafkaInputOperator { > > @Override > public void setup(OperatorContext context) { > super.setWindowDataManager(new FSWindowDataManager()); > super.setup(context); > } > > } > > public class CustomKafkaSinglePortInputOperator extends > CustomAbstractKafkaInputOperator { > > public final transient DefaultOutputPort<byte[]> outputPort = new > DefaultOutputPort<>(); > > @Override > public AbstractKafkaConsumer createConsumer(Properties properties) > { > return new KafkaConsumer09(properties); > } > > @Override > protected void emitTuple(String cluster, ConsumerRecord<byte[], > byte[]> > message) { > outputPort.emit(message.value()); > } > } > > For output operator I am using KafkaSinglePortExactlyOnceOutputOperator > from > org.apache.apex.malhar.kafka package. With reference to my previous > question, I made sure that both/all applicable mentioned conditions are > satisfied and even after that, I am receiving below error > > 2018-03-15 12:28:49,485 INFO com.datatorrent.stram. > StreamingContainerParent: > child msg: Stopped running due to an exception. java.lang.RuntimeException: > Violates Exactly once. Not all the tuples received after operator reset. > at > org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutp > utOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:190) > at > com.datatorrent.stram.engine.GenericNode.processEndWindow( > GenericNode.java:153) > at com.datatorrent.stram.engine.GenericNode.run(GenericNode. > java:397) > at > com.datatorrent.stram.engine.StreamingContainer$2.run( > StreamingContainer.java:1428) > context: > PTContainer[id=4(container_1521139241132_0003_01_000013), > state=ACTIVE,operators=[PTOperator[id=4,name=kafkaOutputOperator,state= > PENDING_DEPLOY]]] > > Questions I have are > > 1. Is the way input operator made idempotent correct? or am i missing > anything > 2. Do I need to make each and every operator in pipeline idempotent to > achieve this? As per my understanding, Not; because once the mapping > between > tuple to window is established at first operator, it doen't change anywhere > furhter in pipeline > > Regards > Vivek > > > > -- > Sent from: http://apache-apex-users-list.78494.x6.nabble.com/ >