Siyuan,

So for the output operator, we have specified it as a part of our logic
itself.

public class KafkaSinglePortExactlyOnceOutputOperator<T> extends
AbstractKafkaOutputOperator {

    private static final Logger LOG =
LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);

    public transient final DefaultInputPort<Tenant> in = new
DefaultInputPort<Tenant>() {

        Gson gson = new Gson();

        @Override
        public void process(Tenant tenant) {

            try {
                Producer<String, String> producer = getKafkaProducer();
                //ObjectMapper mapper = new ObjectMapper();
                long now = System.currentTimeMillis();
                //Configuration conf = HBaseConfiguration.create();
                //TenantDao dao = new TenantDao(conf);
                //ArrayList<Put> puts = new ArrayList<>();
                if (tenant != null) {
                    //Tenant tenant = tenant.next();
                    if (StringUtils.isNotEmpty(tenant.getGl())) {
                        producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
tenant.getVolumeName(), gson.toJson(tenant)));
                        //puts.add(dao.mkPut(tenant));
                    } else {
                        producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
tenant.getVolumeName(), gson.toJson(tenant)));

                    }
                    producer.flush();
                }
            }



Thanks!!

On Fri, Oct 7, 2016 at 1:34 PM, hsy...@gmail.com <hsy...@gmail.com> wrote:

> Jaspal,
>
> I think you miss the kafkaOut  :)
>
> Regards,
> Siyuan
>
> On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <jaspal.singh1...@gmail.com>
> wrote:
>
>> Siyuan,
>>
>> That's how we have given it in properties file:
>>
>> [image: Inline image 1]
>>
>>
>> Thanks!!
>>
>> On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com <hsy...@gmail.com>
>> wrote:
>>
>>> Jaspal,
>>>
>>> Topic is a mandatory property you have to set. In mapr, the value should
>>> be set to the full stream path example:  /your/stream/path:streamname
>>>
>>> Regards,
>>> Siyuan
>>>
>>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <
>>> jaspal.singh1...@gmail.com> wrote:
>>>
>>>> After making the change, we are getting the below error while
>>>> application launch:
>>>>
>>>> *An error occurred trying to launch the application. Server message:
>>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>>> constraints
>>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>>> propertyPath='topic', message='may not be null', *
>>>>
>>>>
>>>>
>>>> Thanks!!
>>>>
>>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <
>>>> jaspal.singh1...@gmail.com> wrote:
>>>>
>>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>>
>>>>> Thanks for your inputs !!
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>
>>>>>> Should we use malhar-library version 3.5 then ?
>>>>>>
>>>>>>
>>>>>> Thanks!!
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote:
>>>>>>
>>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in
>>>>>>> pom.xml. This operator is not in malhar-library, it's a separate module.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Siyuan,
>>>>>>>>
>>>>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks!!
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Also which kafka output operator you are using?
>>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>> works with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Siyuan
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <
>>>>>>>>> hsy...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hey Jaspal,
>>>>>>>>>>
>>>>>>>>>> Did you add any code to existing 
>>>>>>>>>> KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>>>> from malhar?  If so please make sure the producer you use here is
>>>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is
>>>>>>>>>> not supported by MapR stream.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Siyuan
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thomas,
>>>>>>>>>>>
>>>>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>>>>> operator is getting an object of tenant class from updtream 
>>>>>>>>>>> operator.
>>>>>>>>>>>
>>>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends 
>>>>>>>>>>> AbstractKafkaOutputOperator {
>>>>>>>>>>>
>>>>>>>>>>>     private static final Logger LOG = 
>>>>>>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>>>>
>>>>>>>>>>>     public transient final DefaultInputPort<Tenant> in = new 
>>>>>>>>>>> DefaultInputPort<Tenant>() {
>>>>>>>>>>>
>>>>>>>>>>>         Gson gson = new Gson();
>>>>>>>>>>>
>>>>>>>>>>>         @Override
>>>>>>>>>>>         public void process(Tenant tenant) {
>>>>>>>>>>>
>>>>>>>>>>>             try {
>>>>>>>>>>>                 Producer<String, String> producer = 
>>>>>>>>>>> getKafkaProducer();
>>>>>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>>>>                 if (tenant != null) {
>>>>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>>>>>>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>>>>                     } else {
>>>>>>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>>>>>>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>
>>>>>>>>>>>                     }
>>>>>>>>>>>                     producer.flush();
>>>>>>>>>>>                 }
>>>>>>>>>>>             }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> After building the application, it throws error during launch:
>>>>>>>>>>>
>>>>>>>>>>> An error occurred trying to launch the application. Server
>>>>>>>>>>> message: java.lang.NoClassDefFoundError:
>>>>>>>>>>> Lkafka/javaapi/producer/Producer; at
>>>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>>>>>
>>>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator,
>>>>>>>>>>>> do we need to specify <String, T> ? Since we are getting an object 
>>>>>>>>>>>> of class
>>>>>>>>>>>> type from previous operator.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Are you referring to the upstream operator in the DAG or the
>>>>>>>>>>>>> state of the previous application after relaunch? Since the data 
>>>>>>>>>>>>> is stored
>>>>>>>>>>>>> in MapR streams, an operator that is a producer can also act as a 
>>>>>>>>>>>>> consumer.
>>>>>>>>>>>>> Please clarify your question.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results
>>>>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the 
>>>>>>>>>>>>>> previous
>>>>>>>>>>>>>> operator ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>>>>>>> on.java#L33
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>>>>>> application (relaunch from previous instance's checkpointed 
>>>>>>>>>>>>>>> state).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For cold restart, you would need to consider the property
>>>>>>>>>>>>>>> you mention and decide what is applicable to your use case.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the
>>>>>>>>>>>>>>>> offset ranges to replay in same order from kafka.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's
>>>>>>>>>>>>>>>>> why we call it also end-to-end exactly-once). There is no 
>>>>>>>>>>>>>>>>> such thing as
>>>>>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case 
>>>>>>>>>>>>>>>>> it would be
>>>>>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, 
>>>>>>>>>>>>>>>>> will recover
>>>>>>>>>>>>>>>>> to checkpointed state and re-process the stream from there. 
>>>>>>>>>>>>>>>>> This is
>>>>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input 
>>>>>>>>>>>>>>>>> operator you have
>>>>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is 
>>>>>>>>>>>>>>>>> done by
>>>>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG 
>>>>>>>>>>>>>>>>> is idempotent
>>>>>>>>>>>>>>>>> and the output operator can discard the results that were 
>>>>>>>>>>>>>>>>> already published
>>>>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>>>>>> implementation that is taking care of exactly once 
>>>>>>>>>>>>>>>>>> processing at output.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make
>>>>>>>>>>>>>>>>>> sure they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The operator will ensure that messages are not
>>>>>>>>>>>>>>>>>>> duplicated, under the stated assumptions.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>>> t...@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there
>>>>>>>>>>>>>>>>>>>>> is Kafka input, which is configured to be idempotent. The 
>>>>>>>>>>>>>>>>>>>>> results are
>>>>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports 
>>>>>>>>>>>>>>>>>>>>> exactly-once through
>>>>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input), 
>>>>>>>>>>>>>>>>>>>>> hence there is no need
>>>>>>>>>>>>>>>>>>>>> to configure the processing mode at all.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to