Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. This operator is not in malhar-library, it's a separate module.
On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <jaspal.singh1...@gmail.com> wrote: > Hi Siyuan, > > I am using the same Kafka producer as you mentioned. But I am not seeing > the AbstractKafkaOutputOperator in malhar library while import. > > > Thanks!! > > On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com> > wrote: > >> Also which kafka output operator you are using? >> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead >> of com.datatorrent.contrib.kafka.AbstractOutputOperator. >> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with >> MapR stream, the latter only works with kafka 0.8.* or 0.9 >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com> >> wrote: >> >>> Hey Jaspal, >>> >>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator >>> from malhar? If so please make sure the producer you use here is >>> org.apache.kafka.clients.producer.KafkaProducer instead of >>> kafka.javaapi.producer.Producer. That is old api and that is not >>> supported by MapR stream. >>> >>> >>> Regards, >>> Siyuan >>> >>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <jaspal.singh1...@gmail.com >>> > wrote: >>> >>>> Thomas, >>>> >>>> Below is the operator implementation we are trying to run. This >>>> operator is getting an object of tenant class from updtream operator. >>>> >>>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>>> AbstractKafkaOutputOperator { >>>> >>>> private static final Logger LOG = >>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>>> >>>> public transient final DefaultInputPort<Tenant> in = new >>>> DefaultInputPort<Tenant>() { >>>> >>>> Gson gson = new Gson(); >>>> >>>> @Override >>>> public void process(Tenant tenant) { >>>> >>>> try { >>>> Producer<String, String> producer = getKafkaProducer(); >>>> //ObjectMapper mapper = new ObjectMapper(); >>>> long now = System.currentTimeMillis(); >>>> //Configuration conf = HBaseConfiguration.create(); >>>> //TenantDao dao = new TenantDao(conf); >>>> //ArrayList<Put> puts = new ArrayList<>(); >>>> if (tenant != null) { >>>> //Tenant tenant = tenant.next(); >>>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>>> producer.send(new ProducerRecord<String, >>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>> //puts.add(dao.mkPut(tenant)); >>>> } else { >>>> producer.send(new ProducerRecord<String, >>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>> >>>> } >>>> producer.flush(); >>>> } >>>> } >>>> >>>> >>>> After building the application, it throws error during launch: >>>> >>>> An error occurred trying to launch the application. Server message: >>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >>>> java.lang.Class.getDeclaredFields0(Native Method) at >>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>>> >>>> >>>> Thanks >>>> Jaspal >>>> >>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>>> jaspal.singh1...@gmail.com> wrote: >>>> >>>>> Thomas, >>>>> >>>>> I was trying to refer to the input from previous operator. >>>>> >>>>> Another thing when we extend the AbstractKafkaOutputOperator, do we >>>>> need to specify <String, T> ? Since we are getting an object of class type >>>>> from previous operator. >>>>> >>>>> >>>>> Thanks >>>>> Jaspal >>>>> >>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> wrote: >>>>> >>>>>> Are you referring to the upstream operator in the DAG or the state of >>>>>> the previous application after relaunch? Since the data is stored in MapR >>>>>> streams, an operator that is a producer can also act as a consumer. >>>>>> Please >>>>>> clarify your question. >>>>>> >>>>>> >>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>> >>>>>>> Hi Thomas, >>>>>>> >>>>>>> I have a question, so when we are using >>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into >>>>>>> maprstream topic will it be able to read messgaes from the previous >>>>>>> operator ? >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> Jaspal >>>>>>> >>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> wrote: >>>>>>> >>>>>>>> For recovery you need to set the window data manager like so: >>>>>>>> >>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial >>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>>>>>>> on.java#L33 >>>>>>>> >>>>>>>> That will also apply to stateful restart of the entire application >>>>>>>> (relaunch from previous instance's checkpointed state). >>>>>>>> >>>>>>>> For cold restart, you would need to consider the property you >>>>>>>> mention and decide what is applicable to your use case. >>>>>>>> >>>>>>>> Thomas >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Ok now I get it. Thanks for the nice explaination !! >>>>>>>>> >>>>>>>>> One more thing, so you mentioned about checkpointing the offset >>>>>>>>> ranges to replay in same order from kafka. >>>>>>>>> >>>>>>>>> Is there any property we need to configure to do that? like >>>>>>>>> initialOffset set to APPLICATION_OR_LATEST. >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Jaspal >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.we...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> What you want is the effect of exactly-once output (that's why we >>>>>>>>>> call it also end-to-end exactly-once). There is no such thing as >>>>>>>>>> exactly-once processing in a distributed system. In this case it >>>>>>>>>> would be >>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will >>>>>>>>>> recover >>>>>>>>>> to checkpointed state and re-process the stream from there. This is >>>>>>>>>> at-least-once, the default behavior. Because in the input operator >>>>>>>>>> you have >>>>>>>>>> configured to replay in the same order from Kafka (this is done by >>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is >>>>>>>>>> idempotent >>>>>>>>>> and the output operator can discard the results that were already >>>>>>>>>> published >>>>>>>>>> instead of producing duplicates. >>>>>>>>>> >>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I think this is something called a customized operator >>>>>>>>>>> implementation that is taking care of exactly once processing at >>>>>>>>>>> output. >>>>>>>>>>> >>>>>>>>>>> What if any previous operators fail ? How we can make sure they >>>>>>>>>>> also recover using EXACTLY_ONCE processing mode ? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Jaspal >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> In that case please have a look at: >>>>>>>>>>>> >>>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/ >>>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl >>>>>>>>>>>> yOnceOutputOperator.java >>>>>>>>>>>> >>>>>>>>>>>> The operator will ensure that messages are not duplicated, >>>>>>>>>>>> under the stated assumptions. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>> >>>>>>>>>>>>> In our case we are writing the results back to >>>>>>>>>>>>> maprstreams topic based on some validations. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Jaspal >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> which operators in your application are writing to external >>>>>>>>>>>>>> systems? >>>>>>>>>>>>>> >>>>>>>>>>>>>> When you look at the example from the blog ( >>>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria >>>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured >>>>>>>>>>>>>> to be idempotent. The results are written to JDBC. That operator >>>>>>>>>>>>>> by itself >>>>>>>>>>>>>> supports exactly-once through transactions (in conjunction with >>>>>>>>>>>>>> idempotent >>>>>>>>>>>>>> input), hence there is no need to configure the processing mode >>>>>>>>>>>>>> at all. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >