Thomas, Below is the operator implementation we are trying to run. This operator is getting an object of tenant class from updtream operator.
public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator { private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() { Gson gson = new Gson(); @Override public void process(Tenant tenant) { try { Producer<String, String> producer = getKafkaProducer(); //ObjectMapper mapper = new ObjectMapper(); long now = System.currentTimeMillis(); //Configuration conf = HBaseConfiguration.create(); //TenantDao dao = new TenantDao(conf); //ArrayList<Put> puts = new ArrayList<>(); if (tenant != null) { //Tenant tenant = tenant.next(); if (StringUtils.isNotEmpty(tenant.getGl())) { producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant))); //puts.add(dao.mkPut(tenant)); } else { producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant))); } producer.flush(); } } After building the application, it throws error during launch: An error occurred trying to launch the application. Server message: java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2583) at java.lang.Class.getDeclaredFields(Class.java:1916) at Thanks Jaspal On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <jaspal.singh1...@gmail.com> wrote: > Thomas, > > I was trying to refer to the input from previous operator. > > Another thing when we extend the AbstractKafkaOutputOperator, do we need > to specify <String, T> ? Since we are getting an object of class type from > previous operator. > > > Thanks > Jaspal > > On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> wrote: > >> Are you referring to the upstream operator in the DAG or the state of the >> previous application after relaunch? Since the data is stored in MapR >> streams, an operator that is a producer can also act as a consumer. Please >> clarify your question. >> >> >> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <jaspal.singh1...@gmail.com> >> wrote: >> >>> Hi Thomas, >>> >>> I have a question, so when we are using >>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into >>> maprstream topic will it be able to read messgaes from the previous >>> operator ? >>> >>> >>> Thanks >>> Jaspal >>> >>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> wrote: >>> >>>> For recovery you need to set the window data manager like so: >>>> >>>> https://github.com/DataTorrent/examples/blob/master/tutorial >>>> s/exactly-once/src/main/java/com/example/myapexapp/Application.java#L33 >>>> >>>> That will also apply to stateful restart of the entire application >>>> (relaunch from previous instance's checkpointed state). >>>> >>>> For cold restart, you would need to consider the property you mention >>>> and decide what is applicable to your use case. >>>> >>>> Thomas >>>> >>>> >>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >>>> jaspal.singh1...@gmail.com> wrote: >>>> >>>>> Ok now I get it. Thanks for the nice explaination !! >>>>> >>>>> One more thing, so you mentioned about checkpointing the offset ranges >>>>> to replay in same order from kafka. >>>>> >>>>> Is there any property we need to configure to do that? like >>>>> initialOffset set to APPLICATION_OR_LATEST. >>>>> >>>>> >>>>> Thanks >>>>> Jaspal >>>>> >>>>> >>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.we...@gmail.com> >>>>> wrote: >>>>> >>>>>> What you want is the effect of exactly-once output (that's why we >>>>>> call it also end-to-end exactly-once). There is no such thing as >>>>>> exactly-once processing in a distributed system. In this case it would be >>>>>> rather "produce exactly-once. Upstream operators, on failure, will >>>>>> recover >>>>>> to checkpointed state and re-process the stream from there. This is >>>>>> at-least-once, the default behavior. Because in the input operator you >>>>>> have >>>>>> configured to replay in the same order from Kafka (this is done by >>>>>> checkpointing the offset ranges), the computation in the DAG is >>>>>> idempotent >>>>>> and the output operator can discard the results that were already >>>>>> published >>>>>> instead of producing duplicates. >>>>>> >>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>> >>>>>>> I think this is something called a customized operator >>>>>>> implementation that is taking care of exactly once processing at output. >>>>>>> >>>>>>> What if any previous operators fail ? How we can make sure they also >>>>>>> recover using EXACTLY_ONCE processing mode ? >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> Jaspal >>>>>>> >>>>>>> >>>>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.we...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> In that case please have a look at: >>>>>>>> >>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/ >>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl >>>>>>>> yOnceOutputOperator.java >>>>>>>> >>>>>>>> The operator will ensure that messages are not duplicated, under >>>>>>>> the stated assumptions. >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Thomas, >>>>>>>>> >>>>>>>>> In our case we are writing the results back to maprstreams topic based >>>>>>>>> on some validations. >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Jaspal >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> which operators in your application are writing to external >>>>>>>>>> systems? >>>>>>>>>> >>>>>>>>>> When you look at the example from the blog ( >>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria >>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured to >>>>>>>>>> be idempotent. The results are written to JDBC. That operator by >>>>>>>>>> itself >>>>>>>>>> supports exactly-once through transactions (in conjunction with >>>>>>>>>> idempotent >>>>>>>>>> input), hence there is no need to configure the processing mode at >>>>>>>>>> all. >>>>>>>>>> >>>>>>>>>> Thomas >>>>>>>>>> >>>>>>>>>> >>>>>> >>>> >>> >> >