Oh I see, you want to send to different topics. Well, then you have to give
some dummy value to the topic property on the operator.

Regards,
Siyuan

On Fri, Oct 7, 2016 at 11:38 AM, Jaspal Singh <jaspal.singh1...@gmail.com>
wrote:

> Siyuan,
>
> So for the output operator, we have specified it as a part of our logic
> itself.
>
> public class KafkaSinglePortExactlyOnceOutputOperator<T> extends 
> AbstractKafkaOutputOperator {
>
>     private static final Logger LOG = 
> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>
>     public transient final DefaultInputPort<Tenant> in = new 
> DefaultInputPort<Tenant>() {
>
>         Gson gson = new Gson();
>
>         @Override
>         public void process(Tenant tenant) {
>
>             try {
>                 Producer<String, String> producer = getKafkaProducer();
>                 //ObjectMapper mapper = new ObjectMapper();
>                 long now = System.currentTimeMillis();
>                 //Configuration conf = HBaseConfiguration.create();
>                 //TenantDao dao = new TenantDao(conf);
>                 //ArrayList<Put> puts = new ArrayList<>();
>                 if (tenant != null) {
>                     //Tenant tenant = tenant.next();
>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>                         producer.send(new ProducerRecord<String, 
> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>  tenant.getVolumeName(), gson.toJson(tenant)));
>                         //puts.add(dao.mkPut(tenant));
>                     } else {
>                         producer.send(new ProducerRecord<String, 
> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>  tenant.getVolumeName(), gson.toJson(tenant)));
>
>                     }
>                     producer.flush();
>                 }
>             }
>
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:34 PM, hsy...@gmail.com <hsy...@gmail.com> wrote:
>
>> Jaspal,
>>
>> I think you miss the kafkaOut  :)
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <jaspal.singh1...@gmail.com
>> > wrote:
>>
>>> Siyuan,
>>>
>>> That's how we have given it in properties file:
>>>
>>> [image: Inline image 1]
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com <hsy...@gmail.com>
>>> wrote:
>>>
>>>> Jaspal,
>>>>
>>>> Topic is a mandatory property you have to set. In mapr, the value
>>>> should be set to the full stream path example:  
>>>> /your/stream/path:streamname
>>>>
>>>> Regards,
>>>> Siyuan
>>>>
>>>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <
>>>> jaspal.singh1...@gmail.com> wrote:
>>>>
>>>>> After making the change, we are getting the below error while
>>>>> application launch:
>>>>>
>>>>> *An error occurred trying to launch the application. Server message:
>>>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>>>> constraints
>>>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>>>> propertyPath='topic', message='may not be null', *
>>>>>
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <
>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>
>>>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>>>
>>>>>> Thanks for your inputs !!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>
>>>>>>> Should we use malhar-library version 3.5 then ?
>>>>>>>
>>>>>>>
>>>>>>> Thanks!!
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote:
>>>>>>>
>>>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in
>>>>>>>> pom.xml. This operator is not in malhar-library, it's a separate 
>>>>>>>> module.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Siyuan,
>>>>>>>>>
>>>>>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks!!
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <
>>>>>>>>> hsy...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Also which kafka output operator you are using?
>>>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>>> works with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Siyuan
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <
>>>>>>>>>> hsy...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey Jaspal,
>>>>>>>>>>>
>>>>>>>>>>> Did you add any code to existing 
>>>>>>>>>>> KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>>>>> from malhar?  If so please make sure the producer you use here
>>>>>>>>>>> is org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is
>>>>>>>>>>> not supported by MapR stream.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Siyuan
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>>>>>> operator is getting an object of tenant class from updtream 
>>>>>>>>>>>> operator.
>>>>>>>>>>>>
>>>>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends 
>>>>>>>>>>>> AbstractKafkaOutputOperator {
>>>>>>>>>>>>
>>>>>>>>>>>>     private static final Logger LOG = 
>>>>>>>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>>>>>
>>>>>>>>>>>>     public transient final DefaultInputPort<Tenant> in = new 
>>>>>>>>>>>> DefaultInputPort<Tenant>() {
>>>>>>>>>>>>
>>>>>>>>>>>>         Gson gson = new Gson();
>>>>>>>>>>>>
>>>>>>>>>>>>         @Override
>>>>>>>>>>>>         public void process(Tenant tenant) {
>>>>>>>>>>>>
>>>>>>>>>>>>             try {
>>>>>>>>>>>>                 Producer<String, String> producer = 
>>>>>>>>>>>> getKafkaProducer();
>>>>>>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>>>>>                 if (tenant != null) {
>>>>>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>>>>>>>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>>>>>                     } else {
>>>>>>>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>>>>>>>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>>
>>>>>>>>>>>>                     }
>>>>>>>>>>>>                     producer.flush();
>>>>>>>>>>>>                 }
>>>>>>>>>>>>             }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> After building the application, it throws error during launch:
>>>>>>>>>>>>
>>>>>>>>>>>> An error occurred trying to launch the application. Server
>>>>>>>>>>>> message: java.lang.NoClassDefFoundError:
>>>>>>>>>>>> Lkafka/javaapi/producer/Producer; at
>>>>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thomas,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator,
>>>>>>>>>>>>> do we need to specify <String, T> ? Since we are getting an 
>>>>>>>>>>>>> object of class
>>>>>>>>>>>>> type from previous operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Are you referring to the upstream operator in the DAG or the
>>>>>>>>>>>>>> state of the previous application after relaunch? Since the data 
>>>>>>>>>>>>>> is stored
>>>>>>>>>>>>>> in MapR streams, an operator that is a producer can also act as 
>>>>>>>>>>>>>> a consumer.
>>>>>>>>>>>>>> Please clarify your question.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results
>>>>>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the 
>>>>>>>>>>>>>>> previous
>>>>>>>>>>>>>>> operator ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org
>>>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For recovery you need to set the window data manager like
>>>>>>>>>>>>>>>> so:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://github.com/DataTorrent
>>>>>>>>>>>>>>>> /examples/blob/master/tutorial
>>>>>>>>>>>>>>>> s/exactly-once/src/main/java/c
>>>>>>>>>>>>>>>> om/example/myapexapp/Application.java#L33
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>>>>>>> application (relaunch from previous instance's checkpointed 
>>>>>>>>>>>>>>>> state).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For cold restart, you would need to consider the property
>>>>>>>>>>>>>>>> you mention and decide what is applicable to your use case.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the
>>>>>>>>>>>>>>>>> offset ranges to replay in same order from kafka.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is there any property we need to configure to do that?
>>>>>>>>>>>>>>>>> like initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What you want is the effect of exactly-once output
>>>>>>>>>>>>>>>>>> (that's why we call it also end-to-end exactly-once). There 
>>>>>>>>>>>>>>>>>> is no such
>>>>>>>>>>>>>>>>>> thing as exactly-once processing in a distributed system. In 
>>>>>>>>>>>>>>>>>> this case it
>>>>>>>>>>>>>>>>>> would be rather "produce exactly-once. Upstream operators, 
>>>>>>>>>>>>>>>>>> on failure, will
>>>>>>>>>>>>>>>>>> recover to checkpointed state and re-process the stream from 
>>>>>>>>>>>>>>>>>> there. This is
>>>>>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input 
>>>>>>>>>>>>>>>>>> operator you have
>>>>>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is 
>>>>>>>>>>>>>>>>>> done by
>>>>>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG 
>>>>>>>>>>>>>>>>>> is idempotent
>>>>>>>>>>>>>>>>>> and the output operator can discard the results that were 
>>>>>>>>>>>>>>>>>> already published
>>>>>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>>>>>>> implementation that is taking care of exactly once 
>>>>>>>>>>>>>>>>>>> processing at output.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make
>>>>>>>>>>>>>>>>>>> sure they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The operator will ensure that messages are not
>>>>>>>>>>>>>>>>>>>> duplicated, under the stated assumptions.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>>>> t...@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once),
>>>>>>>>>>>>>>>>>>>>>> there is Kafka input, which is configured to be 
>>>>>>>>>>>>>>>>>>>>>> idempotent. The results are
>>>>>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports 
>>>>>>>>>>>>>>>>>>>>>> exactly-once through
>>>>>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input), 
>>>>>>>>>>>>>>>>>>>>>> hence there is no need
>>>>>>>>>>>>>>>>>>>>>> to configure the processing mode at all.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to