Siyuan, That's how we have given it in properties file:
[image: Inline image 1] Thanks!! On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com <hsy...@gmail.com> wrote: > Jaspal, > > Topic is a mandatory property you have to set. In mapr, the value should > be set to the full stream path example: /your/stream/path:streamname > > Regards, > Siyuan > > On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <jaspal.singh1...@gmail.com> > wrote: > >> After making the change, we are getting the below error while application >> launch: >> >> *An error occurred trying to launch the application. Server message: >> javax.validation.ConstraintViolationException: Operator kafkaOut violates >> constraints >> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f, >> propertyPath='topic', message='may not be null', * >> >> >> >> Thanks!! >> >> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <jaspal.singh1...@gmail.com> >> wrote: >> >>> So I just changes the malhar-kafka version to 3.5.0, I was able to >>> import the AbstractOutputOperator. Let me try to launch it now. >>> >>> Thanks for your inputs !! >>> >>> >>> >>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <jaspal.singh1...@gmail.com >>> > wrote: >>> >>>> Should we use malhar-library version 3.5 then ? >>>> >>>> >>>> Thanks!! >>>> >>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote: >>>> >>>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. >>>>> This operator is not in malhar-library, it's a separate module. >>>>> >>>>> >>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh < >>>>> jaspal.singh1...@gmail.com> wrote: >>>>> >>>>>> Hi Siyuan, >>>>>> >>>>>> I am using the same Kafka producer as you mentioned. But I am not >>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import. >>>>>> >>>>>> >>>>>> Thanks!! >>>>>> >>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Also which kafka output operator you are using? >>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator >>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. >>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works >>>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9 >>>>>>> >>>>>>> Regards, >>>>>>> Siyuan >>>>>>> >>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hey Jaspal, >>>>>>>> >>>>>>>> Did you add any code to existing >>>>>>>> KafkaSinglePortExactlyOnceOutputOperator >>>>>>>> from malhar? If so please make sure the producer you use here is >>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of >>>>>>>> kafka.javaapi.producer.Producer. That is old api and that is not >>>>>>>> supported by MapR stream. >>>>>>>> >>>>>>>> >>>>>>>> Regards, >>>>>>>> Siyuan >>>>>>>> >>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < >>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thomas, >>>>>>>>> >>>>>>>>> Below is the operator implementation we are trying to run. This >>>>>>>>> operator is getting an object of tenant class from updtream operator. >>>>>>>>> >>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>>>>>>>> AbstractKafkaOutputOperator { >>>>>>>>> >>>>>>>>> private static final Logger LOG = >>>>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>>>>>>>> >>>>>>>>> public transient final DefaultInputPort<Tenant> in = new >>>>>>>>> DefaultInputPort<Tenant>() { >>>>>>>>> >>>>>>>>> Gson gson = new Gson(); >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public void process(Tenant tenant) { >>>>>>>>> >>>>>>>>> try { >>>>>>>>> Producer<String, String> producer = >>>>>>>>> getKafkaProducer(); >>>>>>>>> //ObjectMapper mapper = new ObjectMapper(); >>>>>>>>> long now = System.currentTimeMillis(); >>>>>>>>> //Configuration conf = HBaseConfiguration.create(); >>>>>>>>> //TenantDao dao = new TenantDao(conf); >>>>>>>>> //ArrayList<Put> puts = new ArrayList<>(); >>>>>>>>> if (tenant != null) { >>>>>>>>> //Tenant tenant = tenant.next(); >>>>>>>>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>>>>>>>> producer.send(new ProducerRecord<String, >>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>>>>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>>>>> //puts.add(dao.mkPut(tenant)); >>>>>>>>> } else { >>>>>>>>> producer.send(new ProducerRecord<String, >>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>>>>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>>>>> >>>>>>>>> } >>>>>>>>> producer.flush(); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> After building the application, it throws error during launch: >>>>>>>>> >>>>>>>>> An error occurred trying to launch the application. Server >>>>>>>>> message: java.lang.NoClassDefFoundError: >>>>>>>>> Lkafka/javaapi/producer/Producer; at >>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at >>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Jaspal >>>>>>>>> >>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Thomas, >>>>>>>>>> >>>>>>>>>> I was trying to refer to the input from previous operator. >>>>>>>>>> >>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do >>>>>>>>>> we need to specify <String, T> ? Since we are getting an object of >>>>>>>>>> class >>>>>>>>>> type from previous operator. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Jaspal >>>>>>>>>> >>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Are you referring to the upstream operator in the DAG or the >>>>>>>>>>> state of the previous application after relaunch? Since the data is >>>>>>>>>>> stored >>>>>>>>>>> in MapR streams, an operator that is a producer can also act as a >>>>>>>>>>> consumer. >>>>>>>>>>> Please clarify your question. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>> >>>>>>>>>>>> I have a question, so when we are using >>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results >>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the >>>>>>>>>>>> previous >>>>>>>>>>>> operator ? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Jaspal >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> For recovery you need to set the window data manager like so: >>>>>>>>>>>>> >>>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial >>>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>>>>>>>>>>>> on.java#L33 >>>>>>>>>>>>> >>>>>>>>>>>>> That will also apply to stateful restart of the entire >>>>>>>>>>>>> application (relaunch from previous instance's checkpointed >>>>>>>>>>>>> state). >>>>>>>>>>>>> >>>>>>>>>>>>> For cold restart, you would need to consider the property you >>>>>>>>>>>>> mention and decide what is applicable to your use case. >>>>>>>>>>>>> >>>>>>>>>>>>> Thomas >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !! >>>>>>>>>>>>>> >>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the >>>>>>>>>>>>>> offset ranges to replay in same order from kafka. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Is there any property we need to configure to do that? like >>>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's >>>>>>>>>>>>>>> why we call it also end-to-end exactly-once). There is no such >>>>>>>>>>>>>>> thing as >>>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case >>>>>>>>>>>>>>> it would be >>>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, >>>>>>>>>>>>>>> will recover >>>>>>>>>>>>>>> to checkpointed state and re-process the stream from there. >>>>>>>>>>>>>>> This is >>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input >>>>>>>>>>>>>>> operator you have >>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done >>>>>>>>>>>>>>> by >>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is >>>>>>>>>>>>>>> idempotent >>>>>>>>>>>>>>> and the output operator can discard the results that were >>>>>>>>>>>>>>> already published >>>>>>>>>>>>>>> instead of producing duplicates. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I think this is something called a customized operator >>>>>>>>>>>>>>>> implementation that is taking care of exactly once processing >>>>>>>>>>>>>>>> at output. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make sure >>>>>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> In that case please have a look at: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> https://github.com/apache/apex >>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/ >>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh >>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The operator will ensure that messages are not duplicated, >>>>>>>>>>>>>>>>> under the stated assumptions. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> In our case we are writing the results back to >>>>>>>>>>>>>>>>>> maprstreams topic based on some validations. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>>>>>> t...@apache.org> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> which operators in your application are writing to >>>>>>>>>>>>>>>>>>> external systems? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> When you look at the example from the blog ( >>>>>>>>>>>>>>>>>>> https://github.com/DataTorren >>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there >>>>>>>>>>>>>>>>>>> is Kafka input, which is configured to be idempotent. The >>>>>>>>>>>>>>>>>>> results are >>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports >>>>>>>>>>>>>>>>>>> exactly-once through >>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input), hence >>>>>>>>>>>>>>>>>>> there is no need >>>>>>>>>>>>>>>>>>> to configure the processing mode at all. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >