Have you tried instantiating FSWindowDataManger and setting it on the
operator instance in populateDAG. There may be state associated with the
window manager which will be lost if you set a new instance each time in
setup. You should be able to use KakfaSinglePortInputOperator directly.

On Thu, Mar 15, 2018 at 2:49 PM, Vivek Bhide <vivek.bh...@target.com> wrote:

> This is a continuation of my previous question about
> KafkaSinglePortExactlyOnceOutputOperator. I am trying a to achieve end to
> end exactly once processing with data receiving from one Kafka topic and
> finally posting it to another Kafka topic. Below are three things needed,
> as
> Pramod mentioned in one of the presentations, to achieve this
>
> 1. Idempotent inuput operator
> 2. stateful operator recovery (Something that Apex provides out of the box)
> 3. Action by OutputOperator (similar to what AbstractFileOutputOperator
> does)
>
> I am using the KafkaSinglePortInputOperator from
> org.apache.apex.malhar.kafka package which is not an idempotent by default.
> I made it idempotent as below
>
> public abstract class CustomAbstractKafkaInputOperator extends
> AbstractKafkaInputOperator {
>
>         @Override
>         public void setup(OperatorContext context) {
>                 super.setWindowDataManager(new FSWindowDataManager());
>                 super.setup(context);
>         }
>
> }
>
> public class CustomKafkaSinglePortInputOperator extends
> CustomAbstractKafkaInputOperator {
>
>         public final transient DefaultOutputPort<byte[]> outputPort = new
> DefaultOutputPort<>();
>
>         @Override
>         public AbstractKafkaConsumer createConsumer(Properties properties)
> {
>                 return new KafkaConsumer09(properties);
>         }
>
>         @Override
>         protected void emitTuple(String cluster, ConsumerRecord<byte[],
> byte[]>
> message) {
>                 outputPort.emit(message.value());
>         }
> }
>
> For output operator I am using KafkaSinglePortExactlyOnceOutputOperator
> from
> org.apache.apex.malhar.kafka package. With reference to my previous
> question, I made sure that both/all applicable mentioned conditions are
> satisfied and even after that, I am receiving below error
>
> 2018-03-15 12:28:49,485 INFO com.datatorrent.stram.
> StreamingContainerParent:
> child msg: Stopped running due to an exception. java.lang.RuntimeException:
> Violates Exactly once. Not all the tuples received after operator reset.
>         at
> org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutp
> utOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:190)
>         at
> com.datatorrent.stram.engine.GenericNode.processEndWindow(
> GenericNode.java:153)
>         at com.datatorrent.stram.engine.GenericNode.run(GenericNode.
> java:397)
>         at
> com.datatorrent.stram.engine.StreamingContainer$2.run(
> StreamingContainer.java:1428)
>  context:
> PTContainer[id=4(container_1521139241132_0003_01_000013),
> state=ACTIVE,operators=[PTOperator[id=4,name=kafkaOutputOperator,state=
> PENDING_DEPLOY]]]
>
> Questions I have are
>
> 1. Is the way input operator made idempotent correct? or am i missing
> anything
> 2. Do I need to make each and every operator in pipeline idempotent to
> achieve this? As per my understanding, Not; because once the mapping
> between
> tuple to window is established at first operator, it doen't change anywhere
> furhter in pipeline
>
> Regards
> Vivek
>
>
>
> --
> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>

Reply via email to