Since the operator expects it as part of the configuration, you could just
supply a dummy value that won't have any further effect.

On Fri, Oct 7, 2016 at 11:38 AM, Jaspal Singh <jaspal.singh1...@gmail.com>
wrote:

> Siyuan,
>
> So for the output operator, we have specified it as a part of our logic
> itself.
>
> public class KafkaSinglePortExactlyOnceOutputOperator<T> extends 
> AbstractKafkaOutputOperator {
>
>     private static final Logger LOG = 
> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>
>     public transient final DefaultInputPort<Tenant> in = new 
> DefaultInputPort<Tenant>() {
>
>         Gson gson = new Gson();
>
>         @Override
>         public void process(Tenant tenant) {
>
>             try {
>                 Producer<String, String> producer = getKafkaProducer();
>                 //ObjectMapper mapper = new ObjectMapper();
>                 long now = System.currentTimeMillis();
>                 //Configuration conf = HBaseConfiguration.create();
>                 //TenantDao dao = new TenantDao(conf);
>                 //ArrayList<Put> puts = new ArrayList<>();
>                 if (tenant != null) {
>                     //Tenant tenant = tenant.next();
>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>                         producer.send(new ProducerRecord<String, 
> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>  tenant.getVolumeName(), gson.toJson(tenant)));
>                         //puts.add(dao.mkPut(tenant));
>                     } else {
>                         producer.send(new ProducerRecord<String, 
> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>  tenant.getVolumeName(), gson.toJson(tenant)));
>
>                     }
>                     producer.flush();
>                 }
>             }
>
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:34 PM, hsy...@gmail.com <hsy...@gmail.com> wrote:
>
>> Jaspal,
>>
>> I think you miss the kafkaOut  :)
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <jaspal.singh1...@gmail.com
>> > wrote:
>>
>>> Siyuan,
>>>
>>> That's how we have given it in properties file:
>>>
>>> [image: Inline image 1]
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com <hsy...@gmail.com>
>>> wrote:
>>>
>>>> Jaspal,
>>>>
>>>> Topic is a mandatory property you have to set. In mapr, the value
>>>> should be set to the full stream path example:  
>>>> /your/stream/path:streamname
>>>>
>>>> Regards,
>>>> Siyuan
>>>>
>>>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <
>>>> jaspal.singh1...@gmail.com> wrote:
>>>>
>>>>> After making the change, we are getting the below error while
>>>>> application launch:
>>>>>
>>>>> *An error occurred trying to launch the application. Server message:
>>>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>>>> constraints
>>>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>>>> propertyPath='topic', message='may not be null', *
>>>>>
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <
>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>
>>>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>>>
>>>>>> Thanks for your inputs !!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>
>>>>>>> Should we use malhar-library version 3.5 then ?
>>>>>>>
>>>>>>>
>>>>>>> Thanks!!
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote:
>>>>>>>
>>>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in
>>>>>>>> pom.xml. This operator is not in malhar-library, it's a separate 
>>>>>>>> module.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Siyuan,
>>>>>>>>>
>>>>>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks!!
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <
>>>>>>>>> hsy...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Also which kafka output operator you are using?
>>>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>>> works with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Siyuan
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <
>>>>>>>>>> hsy...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey Jaspal,
>>>>>>>>>>>
>>>>>>>>>>> Did you add any code to existing 
>>>>>>>>>>> KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>>>>> from malhar?  If so please make sure the producer you use here
>>>>>>>>>>> is org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is
>>>>>>>>>>> not supported by MapR stream.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Siyuan
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>>>>>> operator is getting an object of tenant class from updtream 
>>>>>>>>>>>> operator.
>>>>>>>>>>>>
>>>>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends 
>>>>>>>>>>>> AbstractKafkaOutputOperator {
>>>>>>>>>>>>
>>>>>>>>>>>>     private static final Logger LOG = 
>>>>>>>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>>>>>
>>>>>>>>>>>>     public transient final DefaultInputPort<Tenant> in = new 
>>>>>>>>>>>> DefaultInputPort<Tenant>() {
>>>>>>>>>>>>
>>>>>>>>>>>>         Gson gson = new Gson();
>>>>>>>>>>>>
>>>>>>>>>>>>         @Override
>>>>>>>>>>>>         public void process(Tenant tenant) {
>>>>>>>>>>>>
>>>>>>>>>>>>             try {
>>>>>>>>>>>>                 Producer<String, String> producer = 
>>>>>>>>>>>> getKafkaProducer();
>>>>>>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>>>>>                 if (tenant != null) {
>>>>>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>>>>>>>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>>>>>                     } else {
>>>>>>>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>>>>>>>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>>
>>>>>>>>>>>>                     }
>>>>>>>>>>>>                     producer.flush();
>>>>>>>>>>>>                 }
>>>>>>>>>>>>             }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> After building the application, it throws error during launch:
>>>>>>>>>>>>
>>>>>>>>>>>> An error occurred trying to launch the application. Server
>>>>>>>>>>>> message: java.lang.NoClassDefFoundError:
>>>>>>>>>>>> Lkafka/javaapi/producer/Producer; at
>>>>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thomas,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator,
>>>>>>>>>>>>> do we need to specify <String, T> ? Since we are getting an 
>>>>>>>>>>>>> object of class
>>>>>>>>>>>>> type from previous operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Are you referring to the upstream operator in the DAG or the
>>>>>>>>>>>>>> state of the previous application after relaunch? Since the data 
>>>>>>>>>>>>>> is stored
>>>>>>>>>>>>>> in MapR streams, an operator that is a producer can also act as 
>>>>>>>>>>>>>> a consumer.
>>>>>>>>>>>>>> Please clarify your question.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results
>>>>>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the 
>>>>>>>>>>>>>>> previous
>>>>>>>>>>>>>>> operator ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org
>>>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For recovery you need to set the window data manager like
>>>>>>>>>>>>>>>> so:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://github.com/DataTorrent
>>>>>>>>>>>>>>>> /examples/blob/master/tutorial
>>>>>>>>>>>>>>>> s/exactly-once/src/main/java/c
>>>>>>>>>>>>>>>> om/example/myapexapp/Application.java#L33
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>>>>>>> application (relaunch from previous instance's checkpointed 
>>>>>>>>>>>>>>>> state).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For cold restart, you would need to consider the property
>>>>>>>>>>>>>>>> you mention and decide what is applicable to your use case.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the
>>>>>>>>>>>>>>>>> offset ranges to replay in same order from kafka.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is there any property we need to configure to do that?
>>>>>>>>>>>>>>>>> like initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What you want is the effect of exactly-once output
>>>>>>>>>>>>>>>>>> (that's why we call it also end-to-end exactly-once). There 
>>>>>>>>>>>>>>>>>> is no such
>>>>>>>>>>>>>>>>>> thing as exactly-once processing in a distributed system. In 
>>>>>>>>>>>>>>>>>> this case it
>>>>>>>>>>>>>>>>>> would be rather "produce exactly-once. Upstream operators, 
>>>>>>>>>>>>>>>>>> on failure, will
>>>>>>>>>>>>>>>>>> recover to checkpointed state and re-process the stream from 
>>>>>>>>>>>>>>>>>> there. This is
>>>>>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input 
>>>>>>>>>>>>>>>>>> operator you have
>>>>>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is 
>>>>>>>>>>>>>>>>>> done by
>>>>>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG 
>>>>>>>>>>>>>>>>>> is idempotent
>>>>>>>>>>>>>>>>>> and the output operator can discard the results that were 
>>>>>>>>>>>>>>>>>> already published
>>>>>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>>>>>>> implementation that is taking care of exactly once 
>>>>>>>>>>>>>>>>>>> processing at output.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make
>>>>>>>>>>>>>>>>>>> sure they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The operator will ensure that messages are not
>>>>>>>>>>>>>>>>>>>> duplicated, under the stated assumptions.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>>>> t...@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once),
>>>>>>>>>>>>>>>>>>>>>> there is Kafka input, which is configured to be 
>>>>>>>>>>>>>>>>>>>>>> idempotent. The results are
>>>>>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports 
>>>>>>>>>>>>>>>>>>>>>> exactly-once through
>>>>>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input), 
>>>>>>>>>>>>>>>>>>>>>> hence there is no need
>>>>>>>>>>>>>>>>>>>>>> to configure the processing mode at all.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to