Hi Siyuan,

I am using the same Kafka producer as you mentioned. But I am not seeing
the AbstractKafkaOutputOperator in malhar library while import.


Thanks!!

On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com> wrote:

> Also which kafka output operator you are using?
> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead
> of com.datatorrent.contrib.kafka.AbstractOutputOperator.
> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with
> MapR stream, the latter only works with kafka 0.8.* or 0.9
>
> Regards,
> Siyuan
>
> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com>
> wrote:
>
>> Hey Jaspal,
>>
>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>> from malhar?  If so please make sure the producer you use here is
>> org.apache.kafka.clients.producer.KafkaProducer instead of
>> kafka.javaapi.producer.Producer.  That is old api and that is not
>> supported by MapR stream.
>>
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <jaspal.singh1...@gmail.com>
>> wrote:
>>
>>> Thomas,
>>>
>>> Below is the operator implementation we are trying to run. This operator
>>> is getting an object of tenant class from updtream operator.
>>>
>>> public class KafkaSinglePortExactlyOnceOutputOperator extends 
>>> AbstractKafkaOutputOperator {
>>>
>>>     private static final Logger LOG = 
>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>
>>>     public transient final DefaultInputPort<Tenant> in = new 
>>> DefaultInputPort<Tenant>() {
>>>
>>>         Gson gson = new Gson();
>>>
>>>         @Override
>>>         public void process(Tenant tenant) {
>>>
>>>             try {
>>>                 Producer<String, String> producer = getKafkaProducer();
>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>                 long now = System.currentTimeMillis();
>>>                 //Configuration conf = HBaseConfiguration.create();
>>>                 //TenantDao dao = new TenantDao(conf);
>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>                 if (tenant != null) {
>>>                     //Tenant tenant = tenant.next();
>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>                         producer.send(new ProducerRecord<String, 
>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>                         //puts.add(dao.mkPut(tenant));
>>>                     } else {
>>>                         producer.send(new ProducerRecord<String, 
>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>
>>>                     }
>>>                     producer.flush();
>>>                 }
>>>             }
>>>
>>>
>>> After building the application, it throws error during launch:
>>>
>>> An error occurred trying to launch the application. Server message:
>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>
>>>
>>> Thanks
>>> Jaspal
>>>
>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>> jaspal.singh1...@gmail.com> wrote:
>>>
>>>> Thomas,
>>>>
>>>> I was trying to refer to the input from previous operator.
>>>>
>>>> Another thing when we extend the AbstractKafkaOutputOperator, do we
>>>> need to specify <String, T> ? Since we are getting an object of class type
>>>> from previous operator.
>>>>
>>>>
>>>> Thanks
>>>> Jaspal
>>>>
>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> wrote:
>>>>
>>>>> Are you referring to the upstream operator in the DAG or the state of
>>>>> the previous application after relaunch? Since the data is stored in MapR
>>>>> streams, an operator that is a producer can also act as a consumer. Please
>>>>> clarify your question.
>>>>>
>>>>>
>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>
>>>>>> Hi Thomas,
>>>>>>
>>>>>> I have a question, so when we are using
>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>>>> maprstream topic will it be able to read messgaes from the previous
>>>>>> operator ?
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Jaspal
>>>>>>
>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> wrote:
>>>>>>
>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>
>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>> on.java#L33
>>>>>>>
>>>>>>> That will also apply to stateful restart of the entire application
>>>>>>> (relaunch from previous instance's checkpointed state).
>>>>>>>
>>>>>>> For cold restart, you would need to consider the property you
>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>
>>>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>>>> ranges to replay in same order from kafka.
>>>>>>>>
>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jaspal
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.we...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> What you want is the effect of exactly-once output (that's why we
>>>>>>>>> call it also end-to-end exactly-once). There is no such thing as
>>>>>>>>> exactly-once processing in a distributed system. In this case it 
>>>>>>>>> would be
>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will 
>>>>>>>>> recover
>>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>>> at-least-once, the default behavior. Because in the input operator 
>>>>>>>>> you have
>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is 
>>>>>>>>> idempotent
>>>>>>>>> and the output operator can discard the results that were already 
>>>>>>>>> published
>>>>>>>>> instead of producing duplicates.
>>>>>>>>>
>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>> implementation that is taking care of exactly once processing at 
>>>>>>>>>> output.
>>>>>>>>>>
>>>>>>>>>> What if any previous operators fail ? How we can make sure they
>>>>>>>>>> also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Jaspal
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>>>
>>>>>>>>>>> The operator will ensure that messages are not duplicated, under
>>>>>>>>>>> the stated assumptions.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>>>>> systems?
>>>>>>>>>>>>>
>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured
>>>>>>>>>>>>> to be idempotent. The results are written to JDBC. That operator 
>>>>>>>>>>>>> by itself
>>>>>>>>>>>>> supports exactly-once through transactions (in conjunction with 
>>>>>>>>>>>>> idempotent
>>>>>>>>>>>>> input), hence there is no need to configure the processing mode 
>>>>>>>>>>>>> at all.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to