Jaspal,

Topic is a mandatory property you have to set. In mapr, the value should be
set to the full stream path example:  /your/stream/path:streamname

Regards,
Siyuan

On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <jaspal.singh1...@gmail.com>
wrote:

> After making the change, we are getting the below error while application
> launch:
>
> *An error occurred trying to launch the application. Server message:
> javax.validation.ConstraintViolationException: Operator kafkaOut violates
> constraints
> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
> propertyPath='topic', message='may not be null', *
>
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <jaspal.singh1...@gmail.com>
> wrote:
>
>> So I just changes the malhar-kafka version to 3.5.0, I was able to import
>> the AbstractOutputOperator. Let me try to launch it now.
>>
>> Thanks for your inputs !!
>>
>>
>>
>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <jaspal.singh1...@gmail.com>
>> wrote:
>>
>>> Should we use malhar-library version 3.5 then ?
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote:
>>>
>>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>>>> This operator is not in malhar-library, it's a separate module.
>>>>
>>>>
>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>> jaspal.singh1...@gmail.com> wrote:
>>>>
>>>>> Hi Siyuan,
>>>>>
>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Also which kafka output operator you are using?
>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works
>>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>
>>>>>> Regards,
>>>>>> Siyuan
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Jaspal,
>>>>>>>
>>>>>>> Did you add any code to existing 
>>>>>>> KafkaSinglePortExactlyOnceOutputOperator
>>>>>>> from malhar?  If so please make sure the producer you use here is
>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>>>>>> supported by MapR stream.
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Siyuan
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thomas,
>>>>>>>>
>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>>>>
>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends 
>>>>>>>> AbstractKafkaOutputOperator {
>>>>>>>>
>>>>>>>>     private static final Logger LOG = 
>>>>>>>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>
>>>>>>>>     public transient final DefaultInputPort<Tenant> in = new 
>>>>>>>> DefaultInputPort<Tenant>() {
>>>>>>>>
>>>>>>>>         Gson gson = new Gson();
>>>>>>>>
>>>>>>>>         @Override
>>>>>>>>         public void process(Tenant tenant) {
>>>>>>>>
>>>>>>>>             try {
>>>>>>>>                 Producer<String, String> producer = getKafkaProducer();
>>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>                 if (tenant != null) {
>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>>>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>                     } else {
>>>>>>>>                         producer.send(new ProducerRecord<String, 
>>>>>>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>>>>>>>>  tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>
>>>>>>>>                     }
>>>>>>>>                     producer.flush();
>>>>>>>>                 }
>>>>>>>>             }
>>>>>>>>
>>>>>>>>
>>>>>>>> After building the application, it throws error during launch:
>>>>>>>>
>>>>>>>> An error occurred trying to launch the application. Server message:
>>>>>>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jaspal
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thomas,
>>>>>>>>>
>>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>>
>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do
>>>>>>>>> we need to specify <String, T> ? Since we are getting an object of 
>>>>>>>>> class
>>>>>>>>> type from previous operator.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Jaspal
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Are you referring to the upstream operator in the DAG or the
>>>>>>>>>> state of the previous application after relaunch? Since the data is 
>>>>>>>>>> stored
>>>>>>>>>> in MapR streams, an operator that is a producer can also act as a 
>>>>>>>>>> consumer.
>>>>>>>>>> Please clarify your question.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>
>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results
>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the 
>>>>>>>>>>> previous
>>>>>>>>>>> operator ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>>>> on.java#L33
>>>>>>>>>>>>
>>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>>> application (relaunch from previous instance's checkpointed state).
>>>>>>>>>>>>
>>>>>>>>>>>> For cold restart, you would need to consider the property you
>>>>>>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>>>>>>
>>>>>>>>>>>> Thomas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>>
>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the
>>>>>>>>>>>>> offset ranges to replay in same order from kafka.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's
>>>>>>>>>>>>>> why we call it also end-to-end exactly-once). There is no such 
>>>>>>>>>>>>>> thing as
>>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case it 
>>>>>>>>>>>>>> would be
>>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, 
>>>>>>>>>>>>>> will recover
>>>>>>>>>>>>>> to checkpointed state and re-process the stream from there. This 
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input 
>>>>>>>>>>>>>> operator you have
>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done 
>>>>>>>>>>>>>> by
>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is 
>>>>>>>>>>>>>> idempotent
>>>>>>>>>>>>>> and the output operator can discard the results that were 
>>>>>>>>>>>>>> already published
>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>>> implementation that is taking care of exactly once processing 
>>>>>>>>>>>>>>> at output.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make sure
>>>>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>> thomas.we...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The operator will ensure that messages are not duplicated,
>>>>>>>>>>>>>>>> under the stated assumptions.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there is
>>>>>>>>>>>>>>>>>> Kafka input, which is configured to be idempotent. The 
>>>>>>>>>>>>>>>>>> results are written
>>>>>>>>>>>>>>>>>> to JDBC. That operator by itself supports exactly-once 
>>>>>>>>>>>>>>>>>> through transactions
>>>>>>>>>>>>>>>>>> (in conjunction with idempotent input), hence there is no 
>>>>>>>>>>>>>>>>>> need to configure
>>>>>>>>>>>>>>>>>> the processing mode at all.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to