Hi Siyuan, I am using the same Kafka producer as you mentioned. But I am not seeing the AbstractKafkaOutputOperator in malhar library while import.
Thanks!! On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com> wrote: > Also which kafka output operator you are using? > Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead > of com.datatorrent.contrib.kafka.AbstractOutputOperator. > Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with > MapR stream, the latter only works with kafka 0.8.* or 0.9 > > Regards, > Siyuan > > On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com> > wrote: > >> Hey Jaspal, >> >> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator >> from malhar? If so please make sure the producer you use here is >> org.apache.kafka.clients.producer.KafkaProducer instead of >> kafka.javaapi.producer.Producer. That is old api and that is not >> supported by MapR stream. >> >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <jaspal.singh1...@gmail.com> >> wrote: >> >>> Thomas, >>> >>> Below is the operator implementation we are trying to run. This operator >>> is getting an object of tenant class from updtream operator. >>> >>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>> AbstractKafkaOutputOperator { >>> >>> private static final Logger LOG = >>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>> >>> public transient final DefaultInputPort<Tenant> in = new >>> DefaultInputPort<Tenant>() { >>> >>> Gson gson = new Gson(); >>> >>> @Override >>> public void process(Tenant tenant) { >>> >>> try { >>> Producer<String, String> producer = getKafkaProducer(); >>> //ObjectMapper mapper = new ObjectMapper(); >>> long now = System.currentTimeMillis(); >>> //Configuration conf = HBaseConfiguration.create(); >>> //TenantDao dao = new TenantDao(conf); >>> //ArrayList<Put> puts = new ArrayList<>(); >>> if (tenant != null) { >>> //Tenant tenant = tenant.next(); >>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>> producer.send(new ProducerRecord<String, >>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>> tenant.getVolumeName(), gson.toJson(tenant))); >>> //puts.add(dao.mkPut(tenant)); >>> } else { >>> producer.send(new ProducerRecord<String, >>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>> tenant.getVolumeName(), gson.toJson(tenant))); >>> >>> } >>> producer.flush(); >>> } >>> } >>> >>> >>> After building the application, it throws error during launch: >>> >>> An error occurred trying to launch the application. Server message: >>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >>> java.lang.Class.getDeclaredFields0(Native Method) at >>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>> >>> >>> Thanks >>> Jaspal >>> >>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>> jaspal.singh1...@gmail.com> wrote: >>> >>>> Thomas, >>>> >>>> I was trying to refer to the input from previous operator. >>>> >>>> Another thing when we extend the AbstractKafkaOutputOperator, do we >>>> need to specify <String, T> ? Since we are getting an object of class type >>>> from previous operator. >>>> >>>> >>>> Thanks >>>> Jaspal >>>> >>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> wrote: >>>> >>>>> Are you referring to the upstream operator in the DAG or the state of >>>>> the previous application after relaunch? Since the data is stored in MapR >>>>> streams, an operator that is a producer can also act as a consumer. Please >>>>> clarify your question. >>>>> >>>>> >>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >>>>> jaspal.singh1...@gmail.com> wrote: >>>>> >>>>>> Hi Thomas, >>>>>> >>>>>> I have a question, so when we are using >>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into >>>>>> maprstream topic will it be able to read messgaes from the previous >>>>>> operator ? >>>>>> >>>>>> >>>>>> Thanks >>>>>> Jaspal >>>>>> >>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> wrote: >>>>>> >>>>>>> For recovery you need to set the window data manager like so: >>>>>>> >>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial >>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>>>>>> on.java#L33 >>>>>>> >>>>>>> That will also apply to stateful restart of the entire application >>>>>>> (relaunch from previous instance's checkpointed state). >>>>>>> >>>>>>> For cold restart, you would need to consider the property you >>>>>>> mention and decide what is applicable to your use case. >>>>>>> >>>>>>> Thomas >>>>>>> >>>>>>> >>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>> >>>>>>>> Ok now I get it. Thanks for the nice explaination !! >>>>>>>> >>>>>>>> One more thing, so you mentioned about checkpointing the offset >>>>>>>> ranges to replay in same order from kafka. >>>>>>>> >>>>>>>> Is there any property we need to configure to do that? like >>>>>>>> initialOffset set to APPLICATION_OR_LATEST. >>>>>>>> >>>>>>>> >>>>>>>> Thanks >>>>>>>> Jaspal >>>>>>>> >>>>>>>> >>>>>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.we...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> What you want is the effect of exactly-once output (that's why we >>>>>>>>> call it also end-to-end exactly-once). There is no such thing as >>>>>>>>> exactly-once processing in a distributed system. In this case it >>>>>>>>> would be >>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will >>>>>>>>> recover >>>>>>>>> to checkpointed state and re-process the stream from there. This is >>>>>>>>> at-least-once, the default behavior. Because in the input operator >>>>>>>>> you have >>>>>>>>> configured to replay in the same order from Kafka (this is done by >>>>>>>>> checkpointing the offset ranges), the computation in the DAG is >>>>>>>>> idempotent >>>>>>>>> and the output operator can discard the results that were already >>>>>>>>> published >>>>>>>>> instead of producing duplicates. >>>>>>>>> >>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I think this is something called a customized operator >>>>>>>>>> implementation that is taking care of exactly once processing at >>>>>>>>>> output. >>>>>>>>>> >>>>>>>>>> What if any previous operators fail ? How we can make sure they >>>>>>>>>> also recover using EXACTLY_ONCE processing mode ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Jaspal >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> In that case please have a look at: >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/ >>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl >>>>>>>>>>> yOnceOutputOperator.java >>>>>>>>>>> >>>>>>>>>>> The operator will ensure that messages are not duplicated, under >>>>>>>>>>> the stated assumptions. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>> >>>>>>>>>>>> In our case we are writing the results back to >>>>>>>>>>>> maprstreams topic based on some validations. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Jaspal >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> which operators in your application are writing to external >>>>>>>>>>>>> systems? >>>>>>>>>>>>> >>>>>>>>>>>>> When you look at the example from the blog ( >>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria >>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured >>>>>>>>>>>>> to be idempotent. The results are written to JDBC. That operator >>>>>>>>>>>>> by itself >>>>>>>>>>>>> supports exactly-once through transactions (in conjunction with >>>>>>>>>>>>> idempotent >>>>>>>>>>>>> input), hence there is no need to configure the processing mode >>>>>>>>>>>>> at all. >>>>>>>>>>>>> >>>>>>>>>>>>> Thomas >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >