Jaspal, Topic is a mandatory property you have to set. In mapr, the value should be set to the full stream path example: /your/stream/path:streamname
Regards, Siyuan On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <jaspal.singh1...@gmail.com> wrote: > After making the change, we are getting the below error while application > launch: > > *An error occurred trying to launch the application. Server message: > javax.validation.ConstraintViolationException: Operator kafkaOut violates > constraints > [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f, > propertyPath='topic', message='may not be null', * > > > > Thanks!! > > On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <jaspal.singh1...@gmail.com> > wrote: > >> So I just changes the malhar-kafka version to 3.5.0, I was able to import >> the AbstractOutputOperator. Let me try to launch it now. >> >> Thanks for your inputs !! >> >> >> >> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <jaspal.singh1...@gmail.com> >> wrote: >> >>> Should we use malhar-library version 3.5 then ? >>> >>> >>> Thanks!! >>> >>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote: >>> >>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. >>>> This operator is not in malhar-library, it's a separate module. >>>> >>>> >>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh < >>>> jaspal.singh1...@gmail.com> wrote: >>>> >>>>> Hi Siyuan, >>>>> >>>>> I am using the same Kafka producer as you mentioned. But I am not >>>>> seeing the AbstractKafkaOutputOperator in malhar library while import. >>>>> >>>>> >>>>> Thanks!! >>>>> >>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com> >>>>> wrote: >>>>> >>>>>> Also which kafka output operator you are using? >>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator >>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. >>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works >>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9 >>>>>> >>>>>> Regards, >>>>>> Siyuan >>>>>> >>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hey Jaspal, >>>>>>> >>>>>>> Did you add any code to existing >>>>>>> KafkaSinglePortExactlyOnceOutputOperator >>>>>>> from malhar? If so please make sure the producer you use here is >>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of >>>>>>> kafka.javaapi.producer.Producer. That is old api and that is not >>>>>>> supported by MapR stream. >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Siyuan >>>>>>> >>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < >>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>> >>>>>>>> Thomas, >>>>>>>> >>>>>>>> Below is the operator implementation we are trying to run. This >>>>>>>> operator is getting an object of tenant class from updtream operator. >>>>>>>> >>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>>>>>>> AbstractKafkaOutputOperator { >>>>>>>> >>>>>>>> private static final Logger LOG = >>>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>>>>>>> >>>>>>>> public transient final DefaultInputPort<Tenant> in = new >>>>>>>> DefaultInputPort<Tenant>() { >>>>>>>> >>>>>>>> Gson gson = new Gson(); >>>>>>>> >>>>>>>> @Override >>>>>>>> public void process(Tenant tenant) { >>>>>>>> >>>>>>>> try { >>>>>>>> Producer<String, String> producer = getKafkaProducer(); >>>>>>>> //ObjectMapper mapper = new ObjectMapper(); >>>>>>>> long now = System.currentTimeMillis(); >>>>>>>> //Configuration conf = HBaseConfiguration.create(); >>>>>>>> //TenantDao dao = new TenantDao(conf); >>>>>>>> //ArrayList<Put> puts = new ArrayList<>(); >>>>>>>> if (tenant != null) { >>>>>>>> //Tenant tenant = tenant.next(); >>>>>>>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>>>>>>> producer.send(new ProducerRecord<String, >>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>>>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>>>> //puts.add(dao.mkPut(tenant)); >>>>>>>> } else { >>>>>>>> producer.send(new ProducerRecord<String, >>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>>>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>>>> >>>>>>>> } >>>>>>>> producer.flush(); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> After building the application, it throws error during launch: >>>>>>>> >>>>>>>> An error occurred trying to launch the application. Server message: >>>>>>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at >>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>>>>>>> >>>>>>>> >>>>>>>> Thanks >>>>>>>> Jaspal >>>>>>>> >>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thomas, >>>>>>>>> >>>>>>>>> I was trying to refer to the input from previous operator. >>>>>>>>> >>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do >>>>>>>>> we need to specify <String, T> ? Since we are getting an object of >>>>>>>>> class >>>>>>>>> type from previous operator. >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Jaspal >>>>>>>>> >>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Are you referring to the upstream operator in the DAG or the >>>>>>>>>> state of the previous application after relaunch? Since the data is >>>>>>>>>> stored >>>>>>>>>> in MapR streams, an operator that is a producer can also act as a >>>>>>>>>> consumer. >>>>>>>>>> Please clarify your question. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Thomas, >>>>>>>>>>> >>>>>>>>>>> I have a question, so when we are using >>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results >>>>>>>>>>> into maprstream topic will it be able to read messgaes from the >>>>>>>>>>> previous >>>>>>>>>>> operator ? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Jaspal >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> For recovery you need to set the window data manager like so: >>>>>>>>>>>> >>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial >>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>>>>>>>>>>> on.java#L33 >>>>>>>>>>>> >>>>>>>>>>>> That will also apply to stateful restart of the entire >>>>>>>>>>>> application (relaunch from previous instance's checkpointed state). >>>>>>>>>>>> >>>>>>>>>>>> For cold restart, you would need to consider the property you >>>>>>>>>>>> mention and decide what is applicable to your use case. >>>>>>>>>>>> >>>>>>>>>>>> Thomas >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !! >>>>>>>>>>>>> >>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the >>>>>>>>>>>>> offset ranges to replay in same order from kafka. >>>>>>>>>>>>> >>>>>>>>>>>>> Is there any property we need to configure to do that? like >>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Jaspal >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's >>>>>>>>>>>>>> why we call it also end-to-end exactly-once). There is no such >>>>>>>>>>>>>> thing as >>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case it >>>>>>>>>>>>>> would be >>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, >>>>>>>>>>>>>> will recover >>>>>>>>>>>>>> to checkpointed state and re-process the stream from there. This >>>>>>>>>>>>>> is >>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input >>>>>>>>>>>>>> operator you have >>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done >>>>>>>>>>>>>> by >>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is >>>>>>>>>>>>>> idempotent >>>>>>>>>>>>>> and the output operator can discard the results that were >>>>>>>>>>>>>> already published >>>>>>>>>>>>>> instead of producing duplicates. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think this is something called a customized operator >>>>>>>>>>>>>>> implementation that is taking care of exactly once processing >>>>>>>>>>>>>>> at output. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> What if any previous operators fail ? How we can make sure >>>>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> In that case please have a look at: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> https://github.com/apache/apex >>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/ >>>>>>>>>>>>>>>> main/java/org/apache/apex/malh >>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The operator will ensure that messages are not duplicated, >>>>>>>>>>>>>>>> under the stated assumptions. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> In our case we are writing the results back to >>>>>>>>>>>>>>>>> maprstreams topic based on some validations. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> which operators in your application are writing to >>>>>>>>>>>>>>>>>> external systems? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> When you look at the example from the blog ( >>>>>>>>>>>>>>>>>> https://github.com/DataTorren >>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there is >>>>>>>>>>>>>>>>>> Kafka input, which is configured to be idempotent. The >>>>>>>>>>>>>>>>>> results are written >>>>>>>>>>>>>>>>>> to JDBC. That operator by itself supports exactly-once >>>>>>>>>>>>>>>>>> through transactions >>>>>>>>>>>>>>>>>> (in conjunction with idempotent input), hence there is no >>>>>>>>>>>>>>>>>> need to configure >>>>>>>>>>>>>>>>>> the processing mode at all. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >