Jaspal,

I think you miss the kafkaOut  :)

Regards,
Siyuan

On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <jaspal.singh1...@gmail.com>
wrote:

> Siyuan,
>
> That's how we have given it in properties file:
>
> [image: Inline image 1]
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com <hsy...@gmail.com> wrote:
>
>> Jaspal,
>>
>> Topic is a mandatory property you have to set. In mapr, the value should
>> be set to the full stream path example:  /your/stream/path:streamname
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <jaspal.singh1...@gmail.com
>> > wrote:
>>
>>> After making the change, we are getting the below error while
>>> application launch:
>>>
>>> *An error occurred trying to launch the application. Server message:
>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>> constraints
>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>> propertyPath='topic', message='may not be null', *
>>>
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <jaspal.singh1...@gmail.com
>>> > wrote:
>>>
>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>
>>>> Thanks for your inputs !!
>>>>
>>>>
>>>>
>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>> jaspal.singh1...@gmail.com> wrote:
>>>>
>>>>> Should we use malhar-library version 3.5 then ?
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote:
>>>>>
>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>>>>>> This operator is not in malhar-library, it's a separate module.
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Siyuan,
>>>>>>>
>>>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>>>
>>>>>>>
>>>>>>> Thanks!!
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Also which kafka output operator you are using?
>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works
>>>>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Siyuan
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hey Jaspal,
>>>>>>>>>
>>>>>>>>> Did you add any code to existing 
>>>>>>>>> KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>>> from malhar?  If so please make sure the producer you use here is
>>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>>>>>>>> supported by MapR stream.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Siyuan
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thomas,
>>>>>>>>>>
>>>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>>>>>>
>>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends 
>>>>>>>>>> AbstractKafkaOutputOperator {
>>>>>>>>>>
>>>>>>>>>>     private static final Logger LOG = 
>>>>>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>>>
>>>>>>>>>>     public transient final DefaultInputPort<Tenant> in = new 
>>>>>>>>>> DefaultInputPort<Tenant>() {
>>>>>>>>>>
>>>>>>>>>>         Gson gson = new Gson();
>>>>>>>>>>
>>>>>>>>>>         @Override
>>>>>>>>>>         public void process(Tenant tenant) {
>>>>>>>>>>
>>>>>>>>>>             try {
>>>>>>>>>>                 Producer<String, String> producer = 
>>>>>>>>>> getKafkaProducer();
>>>>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>>>                 if (tenant != null) {
>>>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>>>>>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>>>                     } else {
>>>>>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>>>>>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>
>>>>>>>>>>                     }
>>>>>>>>>>                     producer.flush();
>>>>>>>>>>                 }
>>>>>>>>>>             }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> After building the application, it throws error during launch:
>>>>>>>>>>
>>>>>>>>>> An error occurred trying to launch the application. Server
>>>>>>>>>> message: java.lang.NoClassDefFoundError:
>>>>>>>>>> Lkafka/javaapi/producer/Producer; at
>>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Jaspal
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thomas,
>>>>>>>>>>>
>>>>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>>>>
>>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do
>>>>>>>>>>> we need to specify <String, T> ? Since we are getting an object of 
>>>>>>>>>>> class
>>>>>>>>>>> type from previous operator.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Are you referring to the upstream operator in the DAG or the
>>>>>>>>>>>> state of the previous application after relaunch? Since the data 
>>>>>>>>>>>> is stored
>>>>>>>>>>>> in MapR streams, an operator that is a producer can also act as a 
>>>>>>>>>>>> consumer.
>>>>>>>>>>>> Please clarify your question.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results
>>>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the 
>>>>>>>>>>>>> previous
>>>>>>>>>>>>> operator ?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>>>>>> on.java#L33
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>>>>> application (relaunch from previous instance's checkpointed 
>>>>>>>>>>>>>> state).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For cold restart, you would need to consider the property you
>>>>>>>>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the
>>>>>>>>>>>>>>> offset ranges to replay in same order from kafka.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's
>>>>>>>>>>>>>>>> why we call it also end-to-end exactly-once). There is no such 
>>>>>>>>>>>>>>>> thing as
>>>>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case 
>>>>>>>>>>>>>>>> it would be
>>>>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, 
>>>>>>>>>>>>>>>> will recover
>>>>>>>>>>>>>>>> to checkpointed state and re-process the stream from there. 
>>>>>>>>>>>>>>>> This is
>>>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input 
>>>>>>>>>>>>>>>> operator you have
>>>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is 
>>>>>>>>>>>>>>>> done by
>>>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG 
>>>>>>>>>>>>>>>> is idempotent
>>>>>>>>>>>>>>>> and the output operator can discard the results that were 
>>>>>>>>>>>>>>>> already published
>>>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>>>>> implementation that is taking care of exactly once processing 
>>>>>>>>>>>>>>>>> at output.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make sure
>>>>>>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The operator will ensure that messages are not
>>>>>>>>>>>>>>>>>> duplicated, under the stated assumptions.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>> t...@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there
>>>>>>>>>>>>>>>>>>>> is Kafka input, which is configured to be idempotent. The 
>>>>>>>>>>>>>>>>>>>> results are
>>>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports 
>>>>>>>>>>>>>>>>>>>> exactly-once through
>>>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input), hence 
>>>>>>>>>>>>>>>>>>>> there is no need
>>>>>>>>>>>>>>>>>>>> to configure the processing mode at all.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to