Since the operator expects it as part of the configuration, you could just supply a dummy value that won't have any further effect.
On Fri, Oct 7, 2016 at 11:38 AM, Jaspal Singh <jaspal.singh1...@gmail.com> wrote: > Siyuan, > > So for the output operator, we have specified it as a part of our logic > itself. > > public class KafkaSinglePortExactlyOnceOutputOperator<T> extends > AbstractKafkaOutputOperator { > > private static final Logger LOG = > LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); > > public transient final DefaultInputPort<Tenant> in = new > DefaultInputPort<Tenant>() { > > Gson gson = new Gson(); > > @Override > public void process(Tenant tenant) { > > try { > Producer<String, String> producer = getKafkaProducer(); > //ObjectMapper mapper = new ObjectMapper(); > long now = System.currentTimeMillis(); > //Configuration conf = HBaseConfiguration.create(); > //TenantDao dao = new TenantDao(conf); > //ArrayList<Put> puts = new ArrayList<>(); > if (tenant != null) { > //Tenant tenant = tenant.next(); > if (StringUtils.isNotEmpty(tenant.getGl())) { > producer.send(new ProducerRecord<String, > String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", > tenant.getVolumeName(), gson.toJson(tenant))); > //puts.add(dao.mkPut(tenant)); > } else { > producer.send(new ProducerRecord<String, > String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", > tenant.getVolumeName(), gson.toJson(tenant))); > > } > producer.flush(); > } > } > > > > Thanks!! > > On Fri, Oct 7, 2016 at 1:34 PM, hsy...@gmail.com <hsy...@gmail.com> wrote: > >> Jaspal, >> >> I think you miss the kafkaOut :) >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <jaspal.singh1...@gmail.com >> > wrote: >> >>> Siyuan, >>> >>> That's how we have given it in properties file: >>> >>> [image: Inline image 1] >>> >>> >>> Thanks!! >>> >>> On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com <hsy...@gmail.com> >>> wrote: >>> >>>> Jaspal, >>>> >>>> Topic is a mandatory property you have to set. In mapr, the value >>>> should be set to the full stream path example: >>>> /your/stream/path:streamname >>>> >>>> Regards, >>>> Siyuan >>>> >>>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh < >>>> jaspal.singh1...@gmail.com> wrote: >>>> >>>>> After making the change, we are getting the below error while >>>>> application launch: >>>>> >>>>> *An error occurred trying to launch the application. Server message: >>>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates >>>>> constraints >>>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f, >>>>> propertyPath='topic', message='may not be null', * >>>>> >>>>> >>>>> >>>>> Thanks!! >>>>> >>>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh < >>>>> jaspal.singh1...@gmail.com> wrote: >>>>> >>>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to >>>>>> import the AbstractOutputOperator. Let me try to launch it now. >>>>>> >>>>>> Thanks for your inputs !! >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh < >>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>> >>>>>>> Should we use malhar-library version 3.5 then ? >>>>>>> >>>>>>> >>>>>>> Thanks!! >>>>>>> >>>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote: >>>>>>> >>>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in >>>>>>>> pom.xml. This operator is not in malhar-library, it's a separate >>>>>>>> module. >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh < >>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Siyuan, >>>>>>>>> >>>>>>>>> I am using the same Kafka producer as you mentioned. But I am not >>>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import. >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks!! >>>>>>>>> >>>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com < >>>>>>>>> hsy...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Also which kafka output operator you are using? >>>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator >>>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. >>>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator >>>>>>>>>> works with MapR stream, the latter only works with kafka 0.8.* or 0.9 >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Siyuan >>>>>>>>>> >>>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com < >>>>>>>>>> hsy...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hey Jaspal, >>>>>>>>>>> >>>>>>>>>>> Did you add any code to existing >>>>>>>>>>> KafkaSinglePortExactlyOnceOutputOperator >>>>>>>>>>> from malhar? If so please make sure the producer you use here >>>>>>>>>>> is org.apache.kafka.clients.producer.KafkaProducer instead of >>>>>>>>>>> kafka.javaapi.producer.Producer. That is old api and that is >>>>>>>>>>> not supported by MapR stream. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> Siyuan >>>>>>>>>>> >>>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < >>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thomas, >>>>>>>>>>>> >>>>>>>>>>>> Below is the operator implementation we are trying to run. This >>>>>>>>>>>> operator is getting an object of tenant class from updtream >>>>>>>>>>>> operator. >>>>>>>>>>>> >>>>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>>>>>>>>>>> AbstractKafkaOutputOperator { >>>>>>>>>>>> >>>>>>>>>>>> private static final Logger LOG = >>>>>>>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>>>>>>>>>>> >>>>>>>>>>>> public transient final DefaultInputPort<Tenant> in = new >>>>>>>>>>>> DefaultInputPort<Tenant>() { >>>>>>>>>>>> >>>>>>>>>>>> Gson gson = new Gson(); >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public void process(Tenant tenant) { >>>>>>>>>>>> >>>>>>>>>>>> try { >>>>>>>>>>>> Producer<String, String> producer = >>>>>>>>>>>> getKafkaProducer(); >>>>>>>>>>>> //ObjectMapper mapper = new ObjectMapper(); >>>>>>>>>>>> long now = System.currentTimeMillis(); >>>>>>>>>>>> //Configuration conf = HBaseConfiguration.create(); >>>>>>>>>>>> //TenantDao dao = new TenantDao(conf); >>>>>>>>>>>> //ArrayList<Put> puts = new ArrayList<>(); >>>>>>>>>>>> if (tenant != null) { >>>>>>>>>>>> //Tenant tenant = tenant.next(); >>>>>>>>>>>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>>>>>>>>>>> producer.send(new ProducerRecord<String, >>>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>>>>>>>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>>>>>>>> //puts.add(dao.mkPut(tenant)); >>>>>>>>>>>> } else { >>>>>>>>>>>> producer.send(new ProducerRecord<String, >>>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>>>>>>>>>>> tenant.getVolumeName(), gson.toJson(tenant))); >>>>>>>>>>>> >>>>>>>>>>>> } >>>>>>>>>>>> producer.flush(); >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> After building the application, it throws error during launch: >>>>>>>>>>>> >>>>>>>>>>>> An error occurred trying to launch the application. Server >>>>>>>>>>>> message: java.lang.NoClassDefFoundError: >>>>>>>>>>>> Lkafka/javaapi/producer/Producer; at >>>>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at >>>>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>>>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Jaspal >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thomas, >>>>>>>>>>>>> >>>>>>>>>>>>> I was trying to refer to the input from previous operator. >>>>>>>>>>>>> >>>>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, >>>>>>>>>>>>> do we need to specify <String, T> ? Since we are getting an >>>>>>>>>>>>> object of class >>>>>>>>>>>>> type from previous operator. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Jaspal >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Are you referring to the upstream operator in the DAG or the >>>>>>>>>>>>>> state of the previous application after relaunch? Since the data >>>>>>>>>>>>>> is stored >>>>>>>>>>>>>> in MapR streams, an operator that is a producer can also act as >>>>>>>>>>>>>> a consumer. >>>>>>>>>>>>>> Please clarify your question. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have a question, so when we are using >>>>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results >>>>>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the >>>>>>>>>>>>>>> previous >>>>>>>>>>>>>>> operator ? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org >>>>>>>>>>>>>>> > wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For recovery you need to set the window data manager like >>>>>>>>>>>>>>>> so: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> https://github.com/DataTorrent >>>>>>>>>>>>>>>> /examples/blob/master/tutorial >>>>>>>>>>>>>>>> s/exactly-once/src/main/java/c >>>>>>>>>>>>>>>> om/example/myapexapp/Application.java#L33 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> That will also apply to stateful restart of the entire >>>>>>>>>>>>>>>> application (relaunch from previous instance's checkpointed >>>>>>>>>>>>>>>> state). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For cold restart, you would need to consider the property >>>>>>>>>>>>>>>> you mention and decide what is applicable to your use case. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the >>>>>>>>>>>>>>>>> offset ranges to replay in same order from kafka. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Is there any property we need to configure to do that? >>>>>>>>>>>>>>>>> like initialOffset set to APPLICATION_OR_LATEST. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> What you want is the effect of exactly-once output >>>>>>>>>>>>>>>>>> (that's why we call it also end-to-end exactly-once). There >>>>>>>>>>>>>>>>>> is no such >>>>>>>>>>>>>>>>>> thing as exactly-once processing in a distributed system. In >>>>>>>>>>>>>>>>>> this case it >>>>>>>>>>>>>>>>>> would be rather "produce exactly-once. Upstream operators, >>>>>>>>>>>>>>>>>> on failure, will >>>>>>>>>>>>>>>>>> recover to checkpointed state and re-process the stream from >>>>>>>>>>>>>>>>>> there. This is >>>>>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input >>>>>>>>>>>>>>>>>> operator you have >>>>>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is >>>>>>>>>>>>>>>>>> done by >>>>>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG >>>>>>>>>>>>>>>>>> is idempotent >>>>>>>>>>>>>>>>>> and the output operator can discard the results that were >>>>>>>>>>>>>>>>>> already published >>>>>>>>>>>>>>>>>> instead of producing duplicates. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >>>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I think this is something called a customized operator >>>>>>>>>>>>>>>>>>> implementation that is taking care of exactly once >>>>>>>>>>>>>>>>>>> processing at output. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make >>>>>>>>>>>>>>>>>>> sure they also recover using EXACTLY_ONCE processing mode ? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> In that case please have a look at: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> https://github.com/apache/apex >>>>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/ >>>>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh >>>>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The operator will ensure that messages are not >>>>>>>>>>>>>>>>>>>> duplicated, under the stated assumptions. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>>>>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> In our case we are writing the results back to >>>>>>>>>>>>>>>>>>>>> maprstreams topic based on some validations. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>> Jaspal >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>>>>>>>>>>>>> t...@apache.org> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> which operators in your application are writing to >>>>>>>>>>>>>>>>>>>>>> external systems? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> When you look at the example from the blog ( >>>>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren >>>>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), >>>>>>>>>>>>>>>>>>>>>> there is Kafka input, which is configured to be >>>>>>>>>>>>>>>>>>>>>> idempotent. The results are >>>>>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports >>>>>>>>>>>>>>>>>>>>>> exactly-once through >>>>>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input), >>>>>>>>>>>>>>>>>>>>>> hence there is no need >>>>>>>>>>>>>>>>>>>>>> to configure the processing mode at all. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >