Jaspal, I think you miss the kafkaOut :)
Regards, Siyuan On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <jaspal.singh1...@gmail.com> wrote: > Siyuan, > > That's how we have given it in properties file: > > [image: Inline image 1] > > > Thanks!! > > On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com <hsy...@gmail.com> wrote: > >> Jaspal, >> >> Topic is a mandatory property you have to set. In mapr, the value should >> be set to the full stream path example: /your/stream/path:streamname >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <jaspal.singh1...@gmail.com >> > wrote: >> >>> After making the change, we are getting the below error while >>> application launch: >>> >>> *An error occurred trying to launch the application. Server message: >>> javax.validation.ConstraintViolationException: Operator kafkaOut violates >>> constraints >>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f, >>> propertyPath='topic', message='may not be null', * >>> >>> >>> >>> Thanks!! >>> >>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <jaspal.singh1...@gmail.com >>> > wrote: >>> >>>> So I just changes the malhar-kafka version to 3.5.0, I was able to >>>> import the AbstractOutputOperator. Let me try to launch it now. >>>> >>>> Thanks for your inputs !! >>>> >>>> >>>> >>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh < >>>> jaspal.singh1...@gmail.com> wrote: >>>> >>>>> Should we use malhar-library version 3.5 then ? >>>>> >>>>> >>>>> Thanks!! >>>>> >>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote: >>>>> >>>>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. >>>>>> This operator is not in malhar-library, it's a separate module. >>>>>> >>>>>> >>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh < >>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>> >>>>>>> Hi Siyuan, >>>>>>> >>>>>>> I am using the same Kafka producer as you mentioned. But I am not >>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import. >>>>>>> >>>>>>> >>>>>>> Thanks!! >>>>>>> >>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Also which kafka output operator you are using? >>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator >>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. >>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works >>>>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9 >>>>>>>> >>>>>>>> Regards, >>>>>>>> Siyuan >>>>>>>> >>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> Hey Jaspal, >>>>>>>>> >>>>>>>>> Did you add any code to existing >>>>>>>>> KafkaSinglePortExactlyOnceOutputOperator >>>>>>>>> from malhar? If so please make sure the producer you use here is >>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of >>>>>>>>> kafka.javaapi.producer.Producer. That is old api and that is not >>>>>>>>> supported by MapR stream. >>>>>>>>> >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Siyuan >>>>>>>>> >>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < >>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Thomas, >>>>>>>>>> >>>>>>>>>> Below is the operator implementation we are trying to run. This >>>>>>>>>> operator is getting an object of tenant class from updtream operator. >>>>>>>>>> >>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>>>>>>>>> AbstractKafkaOutputOperator { >>>>>>>>>> >>>>>>>>>> private static final Logger LOG = >>>>>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>>>>>>>>> >>>>>>>>>> public transient final DefaultInputPort<Tenant> in = new >>>>>>>>>> DefaultInputPort<Tenant>() { >>>>>>>>>> >>>>>>>>>> Gson gson = new Gson(); >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public void process(Tenant tenant) { >>>>>>>>>> >>>>>>>>>> try { >>>>>>>>>> Producer<String, String> producer = >>>>>>>>>> getKafkaProducer(); >>>>>>>>>> //ObjectMapper mapper = new ObjectMapper(); >>>>>>>>>> long now = System.currentTimeMillis(); >>>>>>>>>> //Configuration conf = HBaseConfiguration.create(); >>>>>>>>>> //TenantDao dao = new TenantDao(conf); >>>>>>>>>> //ArrayList<Put> puts = new ArrayList<>(); >>>>>>>>>> if (tenant != null) { >>>>>>>>>> //Tenant tenant = tenant.next(); >>>>>>>>>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>>>>>>>>> producer.send(new ProducerRecord<String, >>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>>>>>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>>>>>> //puts.add(dao.mkPut(tenant)); >>>>>>>>>> } else { >>>>>>>>>> producer.send(new ProducerRecord<String, >>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>>>>>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> producer.flush(); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> After building the application, it throws error during launch: >>>>>>>>>> >>>>>>>>>> An error occurred trying to launch the application. Server >>>>>>>>>> message: java.lang.NoClassDefFoundError: >>>>>>>>>> Lkafka/javaapi/producer/Producer; at >>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at >>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Jaspal >>>>>>>>>> >>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Thomas, >>>>>>>>>>> >>>>>>>>>>> I was trying to refer to the input from previous operator. >>>>>>>>>>> >>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do >>>>>>>>>>> we need to specify <String, T> ? Since we are getting an object of >>>>>>>>>>> class >>>>>>>>>>> type from previous operator. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Jaspal >>>>>>>>>>> >>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Are you referring to the upstream operator in the DAG or the >>>>>>>>>>>> state of the previous application after relaunch? Since the data >>>>>>>>>>>> is stored >>>>>>>>>>>> in MapR streams, an operator that is a producer can also act as a >>>>>>>>>>>> consumer. >>>>>>>>>>>> Please clarify your question. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>> >>>>>>>>>>>>> I have a question, so when we are using >>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results >>>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the >>>>>>>>>>>>> previous >>>>>>>>>>>>> operator ? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Jaspal >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> For recovery you need to set the window data manager like so: >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial >>>>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>>>>>>>>>>>>> on.java#L33 >>>>>>>>>>>>>> >>>>>>>>>>>>>> That will also apply to stateful restart of the entire >>>>>>>>>>>>>> application (relaunch from previous instance's checkpointed >>>>>>>>>>>>>> state). >>>>>>>>>>>>>> >>>>>>>>>>>>>> For cold restart, you would need to consider the property you >>>>>>>>>>>>>> mention and decide what is applicable to your use case. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the >>>>>>>>>>>>>>> offset ranges to replay in same order from kafka. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Is there any property we need to configure to do that? like >>>>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's >>>>>>>>>>>>>>>> why we call it also end-to-end exactly-once). There is no such >>>>>>>>>>>>>>>> thing as >>>>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case >>>>>>>>>>>>>>>> it would be >>>>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, >>>>>>>>>>>>>>>> will recover >>>>>>>>>>>>>>>> to checkpointed state and re-process the stream from there. >>>>>>>>>>>>>>>> This is >>>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input >>>>>>>>>>>>>>>> operator you have >>>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is >>>>>>>>>>>>>>>> done by >>>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG >>>>>>>>>>>>>>>> is idempotent >>>>>>>>>>>>>>>> and the output operator can discard the results that were >>>>>>>>>>>>>>>> already published >>>>>>>>>>>>>>>> instead of producing duplicates. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think this is something called a customized operator >>>>>>>>>>>>>>>>> implementation that is taking care of exactly once processing >>>>>>>>>>>>>>>>> at output. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make sure >>>>>>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> In that case please have a look at: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> https://github.com/apache/apex >>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/ >>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh >>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The operator will ensure that messages are not >>>>>>>>>>>>>>>>>> duplicated, under the stated assumptions. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> In our case we are writing the results back to >>>>>>>>>>>>>>>>>>> maprstreams topic based on some validations. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>>>>>>> t...@apache.org> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> which operators in your application are writing to >>>>>>>>>>>>>>>>>>>> external systems? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> When you look at the example from the blog ( >>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren >>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there >>>>>>>>>>>>>>>>>>>> is Kafka input, which is configured to be idempotent. The >>>>>>>>>>>>>>>>>>>> results are >>>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports >>>>>>>>>>>>>>>>>>>> exactly-once through >>>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input), hence >>>>>>>>>>>>>>>>>>>> there is no need >>>>>>>>>>>>>>>>>>>> to configure the processing mode at all. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >