So I just changes the malhar-kafka version to 3.5.0, I was able to import
the AbstractOutputOperator. Let me try to launch it now.

Thanks for your inputs !!



On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <jaspal.singh1...@gmail.com>
wrote:

> Should we use malhar-library version 3.5 then ?
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote:
>
>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>> This operator is not in malhar-library, it's a separate module.
>>
>>
>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <jaspal.singh1...@gmail.com
>> > wrote:
>>
>>> Hi Siyuan,
>>>
>>> I am using the same Kafka producer as you mentioned. But I am not seeing
>>> the AbstractKafkaOutputOperator in malhar library while import.
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com>
>>> wrote:
>>>
>>>> Also which kafka output operator you are using?
>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead
>>>> of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works
>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>
>>>> Regards,
>>>> Siyuan
>>>>
>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey Jaspal,
>>>>>
>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>>> from malhar?  If so please make sure the producer you use here is
>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>>>> supported by MapR stream.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Siyuan
>>>>>
>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>
>>>>>> Thomas,
>>>>>>
>>>>>> Below is the operator implementation we are trying to run. This
>>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>>
>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends 
>>>>>> AbstractKafkaOutputOperator {
>>>>>>
>>>>>>     private static final Logger LOG = 
>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>
>>>>>>     public transient final DefaultInputPort<Tenant> in = new 
>>>>>> DefaultInputPort<Tenant>() {
>>>>>>
>>>>>>         Gson gson = new Gson();
>>>>>>
>>>>>>         @Override
>>>>>>         public void process(Tenant tenant) {
>>>>>>
>>>>>>             try {
>>>>>>                 Producer<String, String> producer = getKafkaProducer();
>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>                 long now = System.currentTimeMillis();
>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>                 if (tenant != null) {
>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>                     } else {
>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>
>>>>>>                     }
>>>>>>                     producer.flush();
>>>>>>                 }
>>>>>>             }
>>>>>>
>>>>>>
>>>>>> After building the application, it throws error during launch:
>>>>>>
>>>>>> An error occurred trying to launch the application. Server message:
>>>>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Jaspal
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>
>>>>>>> Thomas,
>>>>>>>
>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>
>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do we
>>>>>>> need to specify <String, T> ? Since we are getting an object of class 
>>>>>>> type
>>>>>>> from previous operator.
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Jaspal
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Are you referring to the upstream operator in the DAG or the state
>>>>>>>> of the previous application after relaunch? Since the data is stored in
>>>>>>>> MapR streams, an operator that is a producer can also act as a 
>>>>>>>> consumer.
>>>>>>>> Please clarify your question.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Thomas,
>>>>>>>>>
>>>>>>>>> I have a question, so when we are using
>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>>>>>>> maprstream topic will it be able to read messgaes from the previous
>>>>>>>>> operator ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Jaspal
>>>>>>>>>
>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>>>
>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>> on.java#L33
>>>>>>>>>>
>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>> application (relaunch from previous instance's checkpointed state).
>>>>>>>>>>
>>>>>>>>>> For cold restart, you would need to consider the property you
>>>>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>>>>
>>>>>>>>>> Thomas
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>
>>>>>>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>>>>>>> ranges to replay in same order from kafka.
>>>>>>>>>>>
>>>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> What you want is the effect of exactly-once output (that's why
>>>>>>>>>>>> we call it also end-to-end exactly-once). There is no such thing as
>>>>>>>>>>>> exactly-once processing in a distributed system. In this case it 
>>>>>>>>>>>> would be
>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will 
>>>>>>>>>>>> recover
>>>>>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>>>>>> at-least-once, the default behavior. Because in the input operator 
>>>>>>>>>>>> you have
>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is 
>>>>>>>>>>>> idempotent
>>>>>>>>>>>> and the output operator can discard the results that were already 
>>>>>>>>>>>> published
>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>> implementation that is taking care of exactly once processing at 
>>>>>>>>>>>>> output.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What if any previous operators fail ? How we can make sure
>>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The operator will ensure that messages are not duplicated,
>>>>>>>>>>>>>> under the stated assumptions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>>>>>>>> systems?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is
>>>>>>>>>>>>>>>> configured to be idempotent. The results are written to JDBC. 
>>>>>>>>>>>>>>>> That operator
>>>>>>>>>>>>>>>> by itself supports exactly-once through transactions (in 
>>>>>>>>>>>>>>>> conjunction with
>>>>>>>>>>>>>>>> idempotent input), hence there is no need to configure the 
>>>>>>>>>>>>>>>> processing mode
>>>>>>>>>>>>>>>> at all.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to