So I just changes the malhar-kafka version to 3.5.0, I was able to import the AbstractOutputOperator. Let me try to launch it now.
Thanks for your inputs !! On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <jaspal.singh1...@gmail.com> wrote: > Should we use malhar-library version 3.5 then ? > > > Thanks!! > > On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote: > >> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. >> This operator is not in malhar-library, it's a separate module. >> >> >> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <jaspal.singh1...@gmail.com >> > wrote: >> >>> Hi Siyuan, >>> >>> I am using the same Kafka producer as you mentioned. But I am not seeing >>> the AbstractKafkaOutputOperator in malhar library while import. >>> >>> >>> Thanks!! >>> >>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com> >>> wrote: >>> >>>> Also which kafka output operator you are using? >>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead >>>> of com.datatorrent.contrib.kafka.AbstractOutputOperator. >>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works >>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9 >>>> >>>> Regards, >>>> Siyuan >>>> >>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com> >>>> wrote: >>>> >>>>> Hey Jaspal, >>>>> >>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator >>>>> from malhar? If so please make sure the producer you use here is >>>>> org.apache.kafka.clients.producer.KafkaProducer instead of >>>>> kafka.javaapi.producer.Producer. That is old api and that is not >>>>> supported by MapR stream. >>>>> >>>>> >>>>> Regards, >>>>> Siyuan >>>>> >>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < >>>>> jaspal.singh1...@gmail.com> wrote: >>>>> >>>>>> Thomas, >>>>>> >>>>>> Below is the operator implementation we are trying to run. This >>>>>> operator is getting an object of tenant class from updtream operator. >>>>>> >>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>>>>> AbstractKafkaOutputOperator { >>>>>> >>>>>> private static final Logger LOG = >>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>>>>> >>>>>> public transient final DefaultInputPort<Tenant> in = new >>>>>> DefaultInputPort<Tenant>() { >>>>>> >>>>>> Gson gson = new Gson(); >>>>>> >>>>>> @Override >>>>>> public void process(Tenant tenant) { >>>>>> >>>>>> try { >>>>>> Producer<String, String> producer = getKafkaProducer(); >>>>>> //ObjectMapper mapper = new ObjectMapper(); >>>>>> long now = System.currentTimeMillis(); >>>>>> //Configuration conf = HBaseConfiguration.create(); >>>>>> //TenantDao dao = new TenantDao(conf); >>>>>> //ArrayList<Put> puts = new ArrayList<>(); >>>>>> if (tenant != null) { >>>>>> //Tenant tenant = tenant.next(); >>>>>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>>>>> producer.send(new ProducerRecord<String, >>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>> //puts.add(dao.mkPut(tenant)); >>>>>> } else { >>>>>> producer.send(new ProducerRecord<String, >>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>> >>>>>> } >>>>>> producer.flush(); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> After building the application, it throws error during launch: >>>>>> >>>>>> An error occurred trying to launch the application. Server message: >>>>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >>>>>> java.lang.Class.getDeclaredFields0(Native Method) at >>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>>>>> >>>>>> >>>>>> Thanks >>>>>> Jaspal >>>>>> >>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>> >>>>>>> Thomas, >>>>>>> >>>>>>> I was trying to refer to the input from previous operator. >>>>>>> >>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do we >>>>>>> need to specify <String, T> ? Since we are getting an object of class >>>>>>> type >>>>>>> from previous operator. >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> Jaspal >>>>>>> >>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Are you referring to the upstream operator in the DAG or the state >>>>>>>> of the previous application after relaunch? Since the data is stored in >>>>>>>> MapR streams, an operator that is a producer can also act as a >>>>>>>> consumer. >>>>>>>> Please clarify your question. >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Thomas, >>>>>>>>> >>>>>>>>> I have a question, so when we are using >>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into >>>>>>>>> maprstream topic will it be able to read messgaes from the previous >>>>>>>>> operator ? >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Jaspal >>>>>>>>> >>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> For recovery you need to set the window data manager like so: >>>>>>>>>> >>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial >>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>>>>>>>>> on.java#L33 >>>>>>>>>> >>>>>>>>>> That will also apply to stateful restart of the entire >>>>>>>>>> application (relaunch from previous instance's checkpointed state). >>>>>>>>>> >>>>>>>>>> For cold restart, you would need to consider the property you >>>>>>>>>> mention and decide what is applicable to your use case. >>>>>>>>>> >>>>>>>>>> Thomas >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !! >>>>>>>>>>> >>>>>>>>>>> One more thing, so you mentioned about checkpointing the offset >>>>>>>>>>> ranges to replay in same order from kafka. >>>>>>>>>>> >>>>>>>>>>> Is there any property we need to configure to do that? like >>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Jaspal >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> What you want is the effect of exactly-once output (that's why >>>>>>>>>>>> we call it also end-to-end exactly-once). There is no such thing as >>>>>>>>>>>> exactly-once processing in a distributed system. In this case it >>>>>>>>>>>> would be >>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will >>>>>>>>>>>> recover >>>>>>>>>>>> to checkpointed state and re-process the stream from there. This is >>>>>>>>>>>> at-least-once, the default behavior. Because in the input operator >>>>>>>>>>>> you have >>>>>>>>>>>> configured to replay in the same order from Kafka (this is done by >>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is >>>>>>>>>>>> idempotent >>>>>>>>>>>> and the output operator can discard the results that were already >>>>>>>>>>>> published >>>>>>>>>>>> instead of producing duplicates. >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I think this is something called a customized operator >>>>>>>>>>>>> implementation that is taking care of exactly once processing at >>>>>>>>>>>>> output. >>>>>>>>>>>>> >>>>>>>>>>>>> What if any previous operators fail ? How we can make sure >>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Jaspal >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> In that case please have a look at: >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/ >>>>>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl >>>>>>>>>>>>>> yOnceOutputOperator.java >>>>>>>>>>>>>> >>>>>>>>>>>>>> The operator will ensure that messages are not duplicated, >>>>>>>>>>>>>> under the stated assumptions. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In our case we are writing the results back to >>>>>>>>>>>>>>> maprstreams topic based on some validations. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> which operators in your application are writing to external >>>>>>>>>>>>>>>> systems? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> When you look at the example from the blog ( >>>>>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria >>>>>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is >>>>>>>>>>>>>>>> configured to be idempotent. The results are written to JDBC. >>>>>>>>>>>>>>>> That operator >>>>>>>>>>>>>>>> by itself supports exactly-once through transactions (in >>>>>>>>>>>>>>>> conjunction with >>>>>>>>>>>>>>>> idempotent input), hence there is no need to configure the >>>>>>>>>>>>>>>> processing mode >>>>>>>>>>>>>>>> at all. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >