Also which kafka output operator you are using? Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with MapR stream, the latter only works with kafka 0.8.* or 0.9
Regards, Siyuan On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com> wrote: > Hey Jaspal, > > Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator from > malhar? If so please make sure the producer you use here > is org.apache.kafka.clients.producer.KafkaProducer instead of > kafka.javaapi.producer.Producer. That is old api and that is not > supported by MapR stream. > > > Regards, > Siyuan > > On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <jaspal.singh1...@gmail.com> > wrote: > >> Thomas, >> >> Below is the operator implementation we are trying to run. This operator >> is getting an object of tenant class from updtream operator. >> >> public class KafkaSinglePortExactlyOnceOutputOperator extends >> AbstractKafkaOutputOperator { >> >> private static final Logger LOG = >> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >> >> public transient final DefaultInputPort<Tenant> in = new >> DefaultInputPort<Tenant>() { >> >> Gson gson = new Gson(); >> >> @Override >> public void process(Tenant tenant) { >> >> try { >> Producer<String, String> producer = getKafkaProducer(); >> //ObjectMapper mapper = new ObjectMapper(); >> long now = System.currentTimeMillis(); >> //Configuration conf = HBaseConfiguration.create(); >> //TenantDao dao = new TenantDao(conf); >> //ArrayList<Put> puts = new ArrayList<>(); >> if (tenant != null) { >> //Tenant tenant = tenant.next(); >> if (StringUtils.isNotEmpty(tenant.getGl())) { >> producer.send(new ProducerRecord<String, >> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >> tenant.getVolumeName(), gson.toJson(tenant))); >> //puts.add(dao.mkPut(tenant)); >> } else { >> producer.send(new ProducerRecord<String, >> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >> tenant.getVolumeName(), gson.toJson(tenant))); >> >> } >> producer.flush(); >> } >> } >> >> >> After building the application, it throws error during launch: >> >> An error occurred trying to launch the application. Server message: >> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >> java.lang.Class.getDeclaredFields0(Native Method) at >> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >> java.lang.Class.getDeclaredFields(Class.java:1916) at >> >> >> Thanks >> Jaspal >> >> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <jaspal.singh1...@gmail.com >> > wrote: >> >>> Thomas, >>> >>> I was trying to refer to the input from previous operator. >>> >>> Another thing when we extend the AbstractKafkaOutputOperator, do we need >>> to specify <String, T> ? Since we are getting an object of class type from >>> previous operator. >>> >>> >>> Thanks >>> Jaspal >>> >>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> wrote: >>> >>>> Are you referring to the upstream operator in the DAG or the state of >>>> the previous application after relaunch? Since the data is stored in MapR >>>> streams, an operator that is a producer can also act as a consumer. Please >>>> clarify your question. >>>> >>>> >>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >>>> jaspal.singh1...@gmail.com> wrote: >>>> >>>>> Hi Thomas, >>>>> >>>>> I have a question, so when we are using >>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into >>>>> maprstream topic will it be able to read messgaes from the previous >>>>> operator ? >>>>> >>>>> >>>>> Thanks >>>>> Jaspal >>>>> >>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> wrote: >>>>> >>>>>> For recovery you need to set the window data manager like so: >>>>>> >>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial >>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>>>>> on.java#L33 >>>>>> >>>>>> That will also apply to stateful restart of the entire application >>>>>> (relaunch from previous instance's checkpointed state). >>>>>> >>>>>> For cold restart, you would need to consider the property you mention >>>>>> and decide what is applicable to your use case. >>>>>> >>>>>> Thomas >>>>>> >>>>>> >>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>> >>>>>>> Ok now I get it. Thanks for the nice explaination !! >>>>>>> >>>>>>> One more thing, so you mentioned about checkpointing the offset >>>>>>> ranges to replay in same order from kafka. >>>>>>> >>>>>>> Is there any property we need to configure to do that? like >>>>>>> initialOffset set to APPLICATION_OR_LATEST. >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> Jaspal >>>>>>> >>>>>>> >>>>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.we...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> What you want is the effect of exactly-once output (that's why we >>>>>>>> call it also end-to-end exactly-once). There is no such thing as >>>>>>>> exactly-once processing in a distributed system. In this case it would >>>>>>>> be >>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will >>>>>>>> recover >>>>>>>> to checkpointed state and re-process the stream from there. This is >>>>>>>> at-least-once, the default behavior. Because in the input operator you >>>>>>>> have >>>>>>>> configured to replay in the same order from Kafka (this is done by >>>>>>>> checkpointing the offset ranges), the computation in the DAG is >>>>>>>> idempotent >>>>>>>> and the output operator can discard the results that were already >>>>>>>> published >>>>>>>> instead of producing duplicates. >>>>>>>> >>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I think this is something called a customized operator >>>>>>>>> implementation that is taking care of exactly once processing at >>>>>>>>> output. >>>>>>>>> >>>>>>>>> What if any previous operators fail ? How we can make sure they >>>>>>>>> also recover using EXACTLY_ONCE processing mode ? >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Jaspal >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.we...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> In that case please have a look at: >>>>>>>>>> >>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/ >>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl >>>>>>>>>> yOnceOutputOperator.java >>>>>>>>>> >>>>>>>>>> The operator will ensure that messages are not duplicated, under >>>>>>>>>> the stated assumptions. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Thomas, >>>>>>>>>>> >>>>>>>>>>> In our case we are writing the results back to maprstreams topic >>>>>>>>>>> based on some validations. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Jaspal >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> which operators in your application are writing to external >>>>>>>>>>>> systems? >>>>>>>>>>>> >>>>>>>>>>>> When you look at the example from the blog ( >>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria >>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured to >>>>>>>>>>>> be idempotent. The results are written to JDBC. That operator by >>>>>>>>>>>> itself >>>>>>>>>>>> supports exactly-once through transactions (in conjunction with >>>>>>>>>>>> idempotent >>>>>>>>>>>> input), hence there is no need to configure the processing mode at >>>>>>>>>>>> all. >>>>>>>>>>>> >>>>>>>>>>>> Thomas >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> >> >