Should we use malhar-library version 3.5 then ?
Thanks!! On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote: > Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. This > operator is not in malhar-library, it's a separate module. > > > On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <jaspal.singh1...@gmail.com> > wrote: > >> Hi Siyuan, >> >> I am using the same Kafka producer as you mentioned. But I am not seeing >> the AbstractKafkaOutputOperator in malhar library while import. >> >> >> Thanks!! >> >> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com> >> wrote: >> >>> Also which kafka output operator you are using? >>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead >>> of com.datatorrent.contrib.kafka.AbstractOutputOperator. >>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with >>> MapR stream, the latter only works with kafka 0.8.* or 0.9 >>> >>> Regards, >>> Siyuan >>> >>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com> >>> wrote: >>> >>>> Hey Jaspal, >>>> >>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator >>>> from malhar? If so please make sure the producer you use here is >>>> org.apache.kafka.clients.producer.KafkaProducer instead of >>>> kafka.javaapi.producer.Producer. That is old api and that is not >>>> supported by MapR stream. >>>> >>>> >>>> Regards, >>>> Siyuan >>>> >>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < >>>> jaspal.singh1...@gmail.com> wrote: >>>> >>>>> Thomas, >>>>> >>>>> Below is the operator implementation we are trying to run. This >>>>> operator is getting an object of tenant class from updtream operator. >>>>> >>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>>>> AbstractKafkaOutputOperator { >>>>> >>>>> private static final Logger LOG = >>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>>>> >>>>> public transient final DefaultInputPort<Tenant> in = new >>>>> DefaultInputPort<Tenant>() { >>>>> >>>>> Gson gson = new Gson(); >>>>> >>>>> @Override >>>>> public void process(Tenant tenant) { >>>>> >>>>> try { >>>>> Producer<String, String> producer = getKafkaProducer(); >>>>> //ObjectMapper mapper = new ObjectMapper(); >>>>> long now = System.currentTimeMillis(); >>>>> //Configuration conf = HBaseConfiguration.create(); >>>>> //TenantDao dao = new TenantDao(conf); >>>>> //ArrayList<Put> puts = new ArrayList<>(); >>>>> if (tenant != null) { >>>>> //Tenant tenant = tenant.next(); >>>>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>>>> producer.send(new ProducerRecord<String, >>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>> //puts.add(dao.mkPut(tenant)); >>>>> } else { >>>>> producer.send(new ProducerRecord<String, >>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>> >>>>> } >>>>> producer.flush(); >>>>> } >>>>> } >>>>> >>>>> >>>>> After building the application, it throws error during launch: >>>>> >>>>> An error occurred trying to launch the application. Server message: >>>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >>>>> java.lang.Class.getDeclaredFields0(Native Method) at >>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>>>> >>>>> >>>>> Thanks >>>>> Jaspal >>>>> >>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>>>> jaspal.singh1...@gmail.com> wrote: >>>>> >>>>>> Thomas, >>>>>> >>>>>> I was trying to refer to the input from previous operator. >>>>>> >>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do we >>>>>> need to specify <String, T> ? Since we are getting an object of class >>>>>> type >>>>>> from previous operator. >>>>>> >>>>>> >>>>>> Thanks >>>>>> Jaspal >>>>>> >>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> wrote: >>>>>> >>>>>>> Are you referring to the upstream operator in the DAG or the state >>>>>>> of the previous application after relaunch? Since the data is stored in >>>>>>> MapR streams, an operator that is a producer can also act as a consumer. >>>>>>> Please clarify your question. >>>>>>> >>>>>>> >>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Thomas, >>>>>>>> >>>>>>>> I have a question, so when we are using >>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into >>>>>>>> maprstream topic will it be able to read messgaes from the previous >>>>>>>> operator ? >>>>>>>> >>>>>>>> >>>>>>>> Thanks >>>>>>>> Jaspal >>>>>>>> >>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> For recovery you need to set the window data manager like so: >>>>>>>>> >>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial >>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>>>>>>>> on.java#L33 >>>>>>>>> >>>>>>>>> That will also apply to stateful restart of the entire application >>>>>>>>> (relaunch from previous instance's checkpointed state). >>>>>>>>> >>>>>>>>> For cold restart, you would need to consider the property you >>>>>>>>> mention and decide what is applicable to your use case. >>>>>>>>> >>>>>>>>> Thomas >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Ok now I get it. Thanks for the nice explaination !! >>>>>>>>>> >>>>>>>>>> One more thing, so you mentioned about checkpointing the offset >>>>>>>>>> ranges to replay in same order from kafka. >>>>>>>>>> >>>>>>>>>> Is there any property we need to configure to do that? like >>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Jaspal >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> What you want is the effect of exactly-once output (that's why >>>>>>>>>>> we call it also end-to-end exactly-once). There is no such thing as >>>>>>>>>>> exactly-once processing in a distributed system. In this case it >>>>>>>>>>> would be >>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will >>>>>>>>>>> recover >>>>>>>>>>> to checkpointed state and re-process the stream from there. This is >>>>>>>>>>> at-least-once, the default behavior. Because in the input operator >>>>>>>>>>> you have >>>>>>>>>>> configured to replay in the same order from Kafka (this is done by >>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is >>>>>>>>>>> idempotent >>>>>>>>>>> and the output operator can discard the results that were already >>>>>>>>>>> published >>>>>>>>>>> instead of producing duplicates. >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I think this is something called a customized operator >>>>>>>>>>>> implementation that is taking care of exactly once processing at >>>>>>>>>>>> output. >>>>>>>>>>>> >>>>>>>>>>>> What if any previous operators fail ? How we can make sure they >>>>>>>>>>>> also recover using EXACTLY_ONCE processing mode ? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Jaspal >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> In that case please have a look at: >>>>>>>>>>>>> >>>>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/ >>>>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl >>>>>>>>>>>>> yOnceOutputOperator.java >>>>>>>>>>>>> >>>>>>>>>>>>> The operator will ensure that messages are not duplicated, >>>>>>>>>>>>> under the stated assumptions. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>> >>>>>>>>>>>>>> In our case we are writing the results back to >>>>>>>>>>>>>> maprstreams topic based on some validations. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> which operators in your application are writing to external >>>>>>>>>>>>>>> systems? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> When you look at the example from the blog ( >>>>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria >>>>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured >>>>>>>>>>>>>>> to be idempotent. The results are written to JDBC. That >>>>>>>>>>>>>>> operator by itself >>>>>>>>>>>>>>> supports exactly-once through transactions (in conjunction with >>>>>>>>>>>>>>> idempotent >>>>>>>>>>>>>>> input), hence there is no need to configure the processing mode >>>>>>>>>>>>>>> at all. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >