Siyuan, So for the output operator, we have specified it as a part of our logic itself.
public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOutputOperator { private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() { Gson gson = new Gson(); @Override public void process(Tenant tenant) { try { Producer<String, String> producer = getKafkaProducer(); //ObjectMapper mapper = new ObjectMapper(); long now = System.currentTimeMillis(); //Configuration conf = HBaseConfiguration.create(); //TenantDao dao = new TenantDao(conf); //ArrayList<Put> puts = new ArrayList<>(); if (tenant != null) { //Tenant tenant = tenant.next(); if (StringUtils.isNotEmpty(tenant.getGl())) { producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant))); //puts.add(dao.mkPut(tenant)); } else { producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant))); } producer.flush(); } } Thanks!! On Fri, Oct 7, 2016 at 1:34 PM, hsy...@gmail.com <hsy...@gmail.com> wrote: > Jaspal, > > I think you miss the kafkaOut :) > > Regards, > Siyuan > > On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <jaspal.singh1...@gmail.com> > wrote: > >> Siyuan, >> >> That's how we have given it in properties file: >> >> [image: Inline image 1] >> >> >> Thanks!! >> >> On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com <hsy...@gmail.com> >> wrote: >> >>> Jaspal, >>> >>> Topic is a mandatory property you have to set. In mapr, the value should >>> be set to the full stream path example: /your/stream/path:streamname >>> >>> Regards, >>> Siyuan >>> >>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh < >>> jaspal.singh1...@gmail.com> wrote: >>> >>>> After making the change, we are getting the below error while >>>> application launch: >>>> >>>> *An error occurred trying to launch the application. Server message: >>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates >>>> constraints >>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f, >>>> propertyPath='topic', message='may not be null', * >>>> >>>> >>>> >>>> Thanks!! >>>> >>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh < >>>> jaspal.singh1...@gmail.com> wrote: >>>> >>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to >>>>> import the AbstractOutputOperator. Let me try to launch it now. >>>>> >>>>> Thanks for your inputs !! >>>>> >>>>> >>>>> >>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh < >>>>> jaspal.singh1...@gmail.com> wrote: >>>>> >>>>>> Should we use malhar-library version 3.5 then ? >>>>>> >>>>>> >>>>>> Thanks!! >>>>>> >>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote: >>>>>> >>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in >>>>>>> pom.xml. This operator is not in malhar-library, it's a separate module. >>>>>>> >>>>>>> >>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh < >>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Siyuan, >>>>>>>> >>>>>>>> I am using the same Kafka producer as you mentioned. But I am not >>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import. >>>>>>>> >>>>>>>> >>>>>>>> Thanks!! >>>>>>>> >>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> Also which kafka output operator you are using? >>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator >>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. >>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator >>>>>>>>> works with MapR stream, the latter only works with kafka 0.8.* or 0.9 >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Siyuan >>>>>>>>> >>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com < >>>>>>>>> hsy...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hey Jaspal, >>>>>>>>>> >>>>>>>>>> Did you add any code to existing >>>>>>>>>> KafkaSinglePortExactlyOnceOutputOperator >>>>>>>>>> from malhar? If so please make sure the producer you use here is >>>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of >>>>>>>>>> kafka.javaapi.producer.Producer. That is old api and that is >>>>>>>>>> not supported by MapR stream. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Siyuan >>>>>>>>>> >>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < >>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Thomas, >>>>>>>>>>> >>>>>>>>>>> Below is the operator implementation we are trying to run. This >>>>>>>>>>> operator is getting an object of tenant class from updtream >>>>>>>>>>> operator. >>>>>>>>>>> >>>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>>>>>>>>>> AbstractKafkaOutputOperator { >>>>>>>>>>> >>>>>>>>>>> private static final Logger LOG = >>>>>>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>>>>>>>>>> >>>>>>>>>>> public transient final DefaultInputPort<Tenant> in = new >>>>>>>>>>> DefaultInputPort<Tenant>() { >>>>>>>>>>> >>>>>>>>>>> Gson gson = new Gson(); >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public void process(Tenant tenant) { >>>>>>>>>>> >>>>>>>>>>> try { >>>>>>>>>>> Producer<String, String> producer = >>>>>>>>>>> getKafkaProducer(); >>>>>>>>>>> //ObjectMapper mapper = new ObjectMapper(); >>>>>>>>>>> long now = System.currentTimeMillis(); >>>>>>>>>>> //Configuration conf = HBaseConfiguration.create(); >>>>>>>>>>> //TenantDao dao = new TenantDao(conf); >>>>>>>>>>> //ArrayList<Put> puts = new ArrayList<>(); >>>>>>>>>>> if (tenant != null) { >>>>>>>>>>> //Tenant tenant = tenant.next(); >>>>>>>>>>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>>>>>>>>>> producer.send(new ProducerRecord<String, >>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>>>>>>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>>>>>>> //puts.add(dao.mkPut(tenant)); >>>>>>>>>>> } else { >>>>>>>>>>> producer.send(new ProducerRecord<String, >>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>>>>>>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>>>>>>> >>>>>>>>>>> } >>>>>>>>>>> producer.flush(); >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> After building the application, it throws error during launch: >>>>>>>>>>> >>>>>>>>>>> An error occurred trying to launch the application. Server >>>>>>>>>>> message: java.lang.NoClassDefFoundError: >>>>>>>>>>> Lkafka/javaapi/producer/Producer; at >>>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at >>>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Jaspal >>>>>>>>>>> >>>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thomas, >>>>>>>>>>>> >>>>>>>>>>>> I was trying to refer to the input from previous operator. >>>>>>>>>>>> >>>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, >>>>>>>>>>>> do we need to specify <String, T> ? Since we are getting an object >>>>>>>>>>>> of class >>>>>>>>>>>> type from previous operator. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Jaspal >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Are you referring to the upstream operator in the DAG or the >>>>>>>>>>>>> state of the previous application after relaunch? Since the data >>>>>>>>>>>>> is stored >>>>>>>>>>>>> in MapR streams, an operator that is a producer can also act as a >>>>>>>>>>>>> consumer. >>>>>>>>>>>>> Please clarify your question. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have a question, so when we are using >>>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results >>>>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the >>>>>>>>>>>>>> previous >>>>>>>>>>>>>> operator ? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> For recovery you need to set the window data manager like so: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial >>>>>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>>>>>>>>>>>>>> on.java#L33 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> That will also apply to stateful restart of the entire >>>>>>>>>>>>>>> application (relaunch from previous instance's checkpointed >>>>>>>>>>>>>>> state). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For cold restart, you would need to consider the property >>>>>>>>>>>>>>> you mention and decide what is applicable to your use case. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the >>>>>>>>>>>>>>>> offset ranges to replay in same order from kafka. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Is there any property we need to configure to do that? like >>>>>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's >>>>>>>>>>>>>>>>> why we call it also end-to-end exactly-once). There is no >>>>>>>>>>>>>>>>> such thing as >>>>>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case >>>>>>>>>>>>>>>>> it would be >>>>>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, >>>>>>>>>>>>>>>>> will recover >>>>>>>>>>>>>>>>> to checkpointed state and re-process the stream from there. >>>>>>>>>>>>>>>>> This is >>>>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input >>>>>>>>>>>>>>>>> operator you have >>>>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is >>>>>>>>>>>>>>>>> done by >>>>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG >>>>>>>>>>>>>>>>> is idempotent >>>>>>>>>>>>>>>>> and the output operator can discard the results that were >>>>>>>>>>>>>>>>> already published >>>>>>>>>>>>>>>>> instead of producing duplicates. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I think this is something called a customized operator >>>>>>>>>>>>>>>>>> implementation that is taking care of exactly once >>>>>>>>>>>>>>>>>> processing at output. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make >>>>>>>>>>>>>>>>>> sure they also recover using EXACTLY_ONCE processing mode ? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> In that case please have a look at: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> https://github.com/apache/apex >>>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/ >>>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh >>>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The operator will ensure that messages are not >>>>>>>>>>>>>>>>>>> duplicated, under the stated assumptions. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>>>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> In our case we are writing the results back to >>>>>>>>>>>>>>>>>>>> maprstreams topic based on some validations. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>>>>>>>> t...@apache.org> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> which operators in your application are writing to >>>>>>>>>>>>>>>>>>>>> external systems? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> When you look at the example from the blog ( >>>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren >>>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there >>>>>>>>>>>>>>>>>>>>> is Kafka input, which is configured to be idempotent. The >>>>>>>>>>>>>>>>>>>>> results are >>>>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports >>>>>>>>>>>>>>>>>>>>> exactly-once through >>>>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input), >>>>>>>>>>>>>>>>>>>>> hence there is no need >>>>>>>>>>>>>>>>>>>>> to configure the processing mode at all. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >