Re: Datatorrent fault tolerance
Oh I see, you want to send to different topics. Well, then you have to give some dummy value to the topic property on the operator. Regards, Siyuan On Fri, Oct 7, 2016 at 11:38 AM, Jaspal Singh wrote: > Siyuan, > > So for the output operator, we have specified it as a part of our logic > itself. > > public class KafkaSinglePortExactlyOnceOutputOperator extends > AbstractKafkaOutputOperator { > > private static final Logger LOG = > LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); > > public transient final DefaultInputPort in = new > DefaultInputPort() { > > Gson gson = new Gson(); > > @Override > public void process(Tenant tenant) { > > try { > Producer producer = getKafkaProducer(); > //ObjectMapper mapper = new ObjectMapper(); > long now = System.currentTimeMillis(); > //Configuration conf = HBaseConfiguration.create(); > //TenantDao dao = new TenantDao(conf); > //ArrayList puts = new ArrayList<>(); > if (tenant != null) { > //Tenant tenant = tenant.next(); > if (StringUtils.isNotEmpty(tenant.getGl())) { > producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", > tenant.getVolumeName(), gson.toJson(tenant))); > //puts.add(dao.mkPut(tenant)); > } else { > producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", > tenant.getVolumeName(), gson.toJson(tenant))); > > } > producer.flush(); > } > } > > > > Thanks!! > > On Fri, Oct 7, 2016 at 1:34 PM, hsy...@gmail.com wrote: > >> Jaspal, >> >> I think you miss the kafkaOut :) >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh > > wrote: >> >>> Siyuan, >>> >>> That's how we have given it in properties file: >>> >>> [image: Inline image 1] >>> >>> >>> Thanks!! >>> >>> On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com >>> wrote: >>> Jaspal, Topic is a mandatory property you have to set. In mapr, the value should be set to the full stream path example: /your/stream/path:streamname Regards, Siyuan On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > After making the change, we are getting the below error while > application launch: > > *An error occurred trying to launch the application. Server message: > javax.validation.ConstraintViolationException: Operator kafkaOut violates > constraints > [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f, > propertyPath='topic', message='may not be null', * > > > > Thanks!! > > On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh < > jaspal.singh1...@gmail.com> wrote: > >> So I just changes the malhar-kafka version to 3.5.0, I was able to >> import the AbstractOutputOperator. Let me try to launch it now. >> >> Thanks for your inputs !! >> >> >> >> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh < >> jaspal.singh1...@gmail.com> wrote: >> >>> Should we use malhar-library version 3.5 then ? >>> >>> >>> Thanks!! >>> >>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise wrote: >>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. This operator is not in malhar-library, it's a separate module. On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > Hi Siyuan, > > I am using the same Kafka producer as you mentioned. But I am not > seeing the AbstractKafkaOutputOperator in malhar library while import. > > > Thanks!! > > On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com < > hsy...@gmail.com> wrote: > >> Also which kafka output operator you are using? >> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator >> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. >> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator >> works with MapR stream, the latter only works with kafka 0.8.* or 0.9 >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com < >> hsy...@gmail.com> wrote: >> >>> Hey Jaspal, >>> >>> Did you add any code to existing >>> KafkaSinglePortExactlyOnceOutputOperator >>> from malhar? If so please make sure the producer you use here >>>
Re: Datatorrent fault tolerance
Since the operator expects it as part of the configuration, you could just supply a dummy value that won't have any further effect. On Fri, Oct 7, 2016 at 11:38 AM, Jaspal Singh wrote: > Siyuan, > > So for the output operator, we have specified it as a part of our logic > itself. > > public class KafkaSinglePortExactlyOnceOutputOperator extends > AbstractKafkaOutputOperator { > > private static final Logger LOG = > LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); > > public transient final DefaultInputPort in = new > DefaultInputPort() { > > Gson gson = new Gson(); > > @Override > public void process(Tenant tenant) { > > try { > Producer producer = getKafkaProducer(); > //ObjectMapper mapper = new ObjectMapper(); > long now = System.currentTimeMillis(); > //Configuration conf = HBaseConfiguration.create(); > //TenantDao dao = new TenantDao(conf); > //ArrayList puts = new ArrayList<>(); > if (tenant != null) { > //Tenant tenant = tenant.next(); > if (StringUtils.isNotEmpty(tenant.getGl())) { > producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", > tenant.getVolumeName(), gson.toJson(tenant))); > //puts.add(dao.mkPut(tenant)); > } else { > producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", > tenant.getVolumeName(), gson.toJson(tenant))); > > } > producer.flush(); > } > } > > > > Thanks!! > > On Fri, Oct 7, 2016 at 1:34 PM, hsy...@gmail.com wrote: > >> Jaspal, >> >> I think you miss the kafkaOut :) >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh > > wrote: >> >>> Siyuan, >>> >>> That's how we have given it in properties file: >>> >>> [image: Inline image 1] >>> >>> >>> Thanks!! >>> >>> On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com >>> wrote: >>> Jaspal, Topic is a mandatory property you have to set. In mapr, the value should be set to the full stream path example: /your/stream/path:streamname Regards, Siyuan On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > After making the change, we are getting the below error while > application launch: > > *An error occurred trying to launch the application. Server message: > javax.validation.ConstraintViolationException: Operator kafkaOut violates > constraints > [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f, > propertyPath='topic', message='may not be null', * > > > > Thanks!! > > On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh < > jaspal.singh1...@gmail.com> wrote: > >> So I just changes the malhar-kafka version to 3.5.0, I was able to >> import the AbstractOutputOperator. Let me try to launch it now. >> >> Thanks for your inputs !! >> >> >> >> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh < >> jaspal.singh1...@gmail.com> wrote: >> >>> Should we use malhar-library version 3.5 then ? >>> >>> >>> Thanks!! >>> >>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise wrote: >>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. This operator is not in malhar-library, it's a separate module. On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > Hi Siyuan, > > I am using the same Kafka producer as you mentioned. But I am not > seeing the AbstractKafkaOutputOperator in malhar library while import. > > > Thanks!! > > On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com < > hsy...@gmail.com> wrote: > >> Also which kafka output operator you are using? >> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator >> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. >> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator >> works with MapR stream, the latter only works with kafka 0.8.* or 0.9 >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com < >> hsy...@gmail.com> wrote: >> >>> Hey Jaspal, >>> >>> Did you add any code to existing >>> KafkaSinglePortExactlyOnceOutputOperator >>> from malhar? If so please make sure the producer you use here >>> is org.a
Re: Datatorrent fault tolerance
Siyuan, So for the output operator, we have specified it as a part of our logic itself. public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator { private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); public transient final DefaultInputPort in = new DefaultInputPort() { Gson gson = new Gson(); @Override public void process(Tenant tenant) { try { Producer producer = getKafkaProducer(); //ObjectMapper mapper = new ObjectMapper(); long now = System.currentTimeMillis(); //Configuration conf = HBaseConfiguration.create(); //TenantDao dao = new TenantDao(conf); //ArrayList puts = new ArrayList<>(); if (tenant != null) { //Tenant tenant = tenant.next(); if (StringUtils.isNotEmpty(tenant.getGl())) { producer.send(new ProducerRecord("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant))); //puts.add(dao.mkPut(tenant)); } else { producer.send(new ProducerRecord("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant))); } producer.flush(); } } Thanks!! On Fri, Oct 7, 2016 at 1:34 PM, hsy...@gmail.com wrote: > Jaspal, > > I think you miss the kafkaOut :) > > Regards, > Siyuan > > On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh > wrote: > >> Siyuan, >> >> That's how we have given it in properties file: >> >> [image: Inline image 1] >> >> >> Thanks!! >> >> On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com >> wrote: >> >>> Jaspal, >>> >>> Topic is a mandatory property you have to set. In mapr, the value should >>> be set to the full stream path example: /your/stream/path:streamname >>> >>> Regards, >>> Siyuan >>> >>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh < >>> jaspal.singh1...@gmail.com> wrote: >>> After making the change, we are getting the below error while application launch: *An error occurred trying to launch the application. Server message: javax.validation.ConstraintViolationException: Operator kafkaOut violates constraints [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f, propertyPath='topic', message='may not be null', * Thanks!! On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > So I just changes the malhar-kafka version to 3.5.0, I was able to > import the AbstractOutputOperator. Let me try to launch it now. > > Thanks for your inputs !! > > > > On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh < > jaspal.singh1...@gmail.com> wrote: > >> Should we use malhar-library version 3.5 then ? >> >> >> Thanks!! >> >> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise wrote: >> >>> Please make sure to depend on version 3.5 of malhar-kafka in >>> pom.xml. This operator is not in malhar-library, it's a separate module. >>> >>> >>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh < >>> jaspal.singh1...@gmail.com> wrote: >>> Hi Siyuan, I am using the same Kafka producer as you mentioned. But I am not seeing the AbstractKafkaOutputOperator in malhar library while import. Thanks!! On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com >>> > wrote: > Also which kafka output operator you are using? > Please use org.apache.apex.malhar.kafka.AbstractOutputOperator > instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. > Only the org.apache.apex.malhar.kafka.AbstractOutputOperator > works with MapR stream, the latter only works with kafka 0.8.* or 0.9 > > Regards, > Siyuan > > On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com < > hsy...@gmail.com> wrote: > >> Hey Jaspal, >> >> Did you add any code to existing >> KafkaSinglePortExactlyOnceOutputOperator >> from malhar? If so please make sure the producer you use here is >> org.apache.kafka.clients.producer.KafkaProducer instead of >> kafka.javaapi.producer.Producer. That is old api and that is >> not supported by MapR stream. >> >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < >> jaspal.singh1...@gmail.com> wrote: >> >>> Thomas, >>> >>> Below is the operator impleme
Re: Datatorrent fault tolerance
Jaspal, I think you miss the kafkaOut :) Regards, Siyuan On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh wrote: > Siyuan, > > That's how we have given it in properties file: > > [image: Inline image 1] > > > Thanks!! > > On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com wrote: > >> Jaspal, >> >> Topic is a mandatory property you have to set. In mapr, the value should >> be set to the full stream path example: /your/stream/path:streamname >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh > > wrote: >> >>> After making the change, we are getting the below error while >>> application launch: >>> >>> *An error occurred trying to launch the application. Server message: >>> javax.validation.ConstraintViolationException: Operator kafkaOut violates >>> constraints >>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f, >>> propertyPath='topic', message='may not be null', * >>> >>> >>> >>> Thanks!! >>> >>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh >> > wrote: >>> So I just changes the malhar-kafka version to 3.5.0, I was able to import the AbstractOutputOperator. Let me try to launch it now. Thanks for your inputs !! On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > Should we use malhar-library version 3.5 then ? > > > Thanks!! > > On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise wrote: > >> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. >> This operator is not in malhar-library, it's a separate module. >> >> >> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh < >> jaspal.singh1...@gmail.com> wrote: >> >>> Hi Siyuan, >>> >>> I am using the same Kafka producer as you mentioned. But I am not >>> seeing the AbstractKafkaOutputOperator in malhar library while import. >>> >>> >>> Thanks!! >>> >>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com >>> wrote: >>> Also which kafka output operator you are using? Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with MapR stream, the latter only works with kafka 0.8.* or 0.9 Regards, Siyuan On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com >>> > wrote: > Hey Jaspal, > > Did you add any code to existing > KafkaSinglePortExactlyOnceOutputOperator > from malhar? If so please make sure the producer you use here is > org.apache.kafka.clients.producer.KafkaProducer instead of > kafka.javaapi.producer.Producer. That is old api and that is not > supported by MapR stream. > > > Regards, > Siyuan > > On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < > jaspal.singh1...@gmail.com> wrote: > >> Thomas, >> >> Below is the operator implementation we are trying to run. This >> operator is getting an object of tenant class from updtream operator. >> >> public class KafkaSinglePortExactlyOnceOutputOperator extends >> AbstractKafkaOutputOperator { >> >> private static final Logger LOG = >> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >> >> public transient final DefaultInputPort in = new >> DefaultInputPort() { >> >> Gson gson = new Gson(); >> >> @Override >> public void process(Tenant tenant) { >> >> try { >> Producer producer = >> getKafkaProducer(); >> //ObjectMapper mapper = new ObjectMapper(); >> long now = System.currentTimeMillis(); >> //Configuration conf = HBaseConfiguration.create(); >> //TenantDao dao = new TenantDao(conf); >> //ArrayList puts = new ArrayList<>(); >> if (tenant != null) { >> //Tenant tenant = tenant.next(); >> if (StringUtils.isNotEmpty(tenant.getGl())) { >> producer.send(new ProducerRecord> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >> tenant.getVolumeName(), gson.toJson(tenant))); >> //puts.add(dao.mkPut(tenant)); >> } else { >> producer.send(new ProducerRecord> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>
Re: Datatorrent fault tolerance
Siyuan, That's how we have given it in properties file: [image: Inline image 1] Thanks!! On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com wrote: > Jaspal, > > Topic is a mandatory property you have to set. In mapr, the value should > be set to the full stream path example: /your/stream/path:streamname > > Regards, > Siyuan > > On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh > wrote: > >> After making the change, we are getting the below error while application >> launch: >> >> *An error occurred trying to launch the application. Server message: >> javax.validation.ConstraintViolationException: Operator kafkaOut violates >> constraints >> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f, >> propertyPath='topic', message='may not be null', * >> >> >> >> Thanks!! >> >> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh >> wrote: >> >>> So I just changes the malhar-kafka version to 3.5.0, I was able to >>> import the AbstractOutputOperator. Let me try to launch it now. >>> >>> Thanks for your inputs !! >>> >>> >>> >>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh >> > wrote: >>> Should we use malhar-library version 3.5 then ? Thanks!! On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise wrote: > Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. > This operator is not in malhar-library, it's a separate module. > > > On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh < > jaspal.singh1...@gmail.com> wrote: > >> Hi Siyuan, >> >> I am using the same Kafka producer as you mentioned. But I am not >> seeing the AbstractKafkaOutputOperator in malhar library while import. >> >> >> Thanks!! >> >> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com >> wrote: >> >>> Also which kafka output operator you are using? >>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator >>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. >>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works >>> with MapR stream, the latter only works with kafka 0.8.* or 0.9 >>> >>> Regards, >>> Siyuan >>> >>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com >>> wrote: >>> Hey Jaspal, Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator from malhar? If so please make sure the producer you use here is org.apache.kafka.clients.producer.KafkaProducer instead of kafka.javaapi.producer.Producer. That is old api and that is not supported by MapR stream. Regards, Siyuan On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > Thomas, > > Below is the operator implementation we are trying to run. This > operator is getting an object of tenant class from updtream operator. > > public class KafkaSinglePortExactlyOnceOutputOperator extends > AbstractKafkaOutputOperator { > > private static final Logger LOG = > LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); > > public transient final DefaultInputPort in = new > DefaultInputPort() { > > Gson gson = new Gson(); > > @Override > public void process(Tenant tenant) { > > try { > Producer producer = > getKafkaProducer(); > //ObjectMapper mapper = new ObjectMapper(); > long now = System.currentTimeMillis(); > //Configuration conf = HBaseConfiguration.create(); > //TenantDao dao = new TenantDao(conf); > //ArrayList puts = new ArrayList<>(); > if (tenant != null) { > //Tenant tenant = tenant.next(); > if (StringUtils.isNotEmpty(tenant.getGl())) { > producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", > tenant.getVolumeName(), gson.toJson(tenant))); > //puts.add(dao.mkPut(tenant)); > } else { > producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", > tenant.getVolumeName(), gson.toJson(tenant))); > > } > producer.flush(); > } > } > > > After building the application, it throws error during launch: >
Re: Datatorrent fault tolerance
Jaspal, Topic is a mandatory property you have to set. In mapr, the value should be set to the full stream path example: /your/stream/path:streamname Regards, Siyuan On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh wrote: > After making the change, we are getting the below error while application > launch: > > *An error occurred trying to launch the application. Server message: > javax.validation.ConstraintViolationException: Operator kafkaOut violates > constraints > [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f, > propertyPath='topic', message='may not be null', * > > > > Thanks!! > > On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh > wrote: > >> So I just changes the malhar-kafka version to 3.5.0, I was able to import >> the AbstractOutputOperator. Let me try to launch it now. >> >> Thanks for your inputs !! >> >> >> >> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh >> wrote: >> >>> Should we use malhar-library version 3.5 then ? >>> >>> >>> Thanks!! >>> >>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise wrote: >>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. This operator is not in malhar-library, it's a separate module. On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > Hi Siyuan, > > I am using the same Kafka producer as you mentioned. But I am not > seeing the AbstractKafkaOutputOperator in malhar library while import. > > > Thanks!! > > On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com > wrote: > >> Also which kafka output operator you are using? >> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator >> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. >> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works >> with MapR stream, the latter only works with kafka 0.8.* or 0.9 >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com >> wrote: >> >>> Hey Jaspal, >>> >>> Did you add any code to existing >>> KafkaSinglePortExactlyOnceOutputOperator >>> from malhar? If so please make sure the producer you use here is >>> org.apache.kafka.clients.producer.KafkaProducer instead of >>> kafka.javaapi.producer.Producer. That is old api and that is not >>> supported by MapR stream. >>> >>> >>> Regards, >>> Siyuan >>> >>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < >>> jaspal.singh1...@gmail.com> wrote: >>> Thomas, Below is the operator implementation we are trying to run. This operator is getting an object of tenant class from updtream operator. public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator { private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); public transient final DefaultInputPort in = new DefaultInputPort() { Gson gson = new Gson(); @Override public void process(Tenant tenant) { try { Producer producer = getKafkaProducer(); //ObjectMapper mapper = new ObjectMapper(); long now = System.currentTimeMillis(); //Configuration conf = HBaseConfiguration.create(); //TenantDao dao = new TenantDao(conf); //ArrayList puts = new ArrayList<>(); if (tenant != null) { //Tenant tenant = tenant.next(); if (StringUtils.isNotEmpty(tenant.getGl())) { producer.send(new ProducerRecord>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant))); //puts.add(dao.mkPut(tenant)); } else { producer.send(new ProducerRecord>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant))); } producer.flush(); } } After building the application, it throws error during launch: An error occurred trying to launch the application. Server message: java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2583) at java.
Re: Datatorrent fault tolerance
After making the change, we are getting the below error while application launch: *An error occurred trying to launch the application. Server message: javax.validation.ConstraintViolationException: Operator kafkaOut violates constraints [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f, propertyPath='topic', message='may not be null', * Thanks!! On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh wrote: > So I just changes the malhar-kafka version to 3.5.0, I was able to import > the AbstractOutputOperator. Let me try to launch it now. > > Thanks for your inputs !! > > > > On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh > wrote: > >> Should we use malhar-library version 3.5 then ? >> >> >> Thanks!! >> >> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise wrote: >> >>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. >>> This operator is not in malhar-library, it's a separate module. >>> >>> >>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh < >>> jaspal.singh1...@gmail.com> wrote: >>> Hi Siyuan, I am using the same Kafka producer as you mentioned. But I am not seeing the AbstractKafkaOutputOperator in malhar library while import. Thanks!! On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com wrote: > Also which kafka output operator you are using? > Please use org.apache.apex.malhar.kafka.AbstractOutputOperator > instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. > Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works > with MapR stream, the latter only works with kafka 0.8.* or 0.9 > > Regards, > Siyuan > > On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com > wrote: > >> Hey Jaspal, >> >> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator >> from malhar? If so please make sure the producer you use here is >> org.apache.kafka.clients.producer.KafkaProducer instead of >> kafka.javaapi.producer.Producer. That is old api and that is not >> supported by MapR stream. >> >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < >> jaspal.singh1...@gmail.com> wrote: >> >>> Thomas, >>> >>> Below is the operator implementation we are trying to run. This >>> operator is getting an object of tenant class from updtream operator. >>> >>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>> AbstractKafkaOutputOperator { >>> >>> private static final Logger LOG = >>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>> >>> public transient final DefaultInputPort in = new >>> DefaultInputPort() { >>> >>> Gson gson = new Gson(); >>> >>> @Override >>> public void process(Tenant tenant) { >>> >>> try { >>> Producer producer = getKafkaProducer(); >>> //ObjectMapper mapper = new ObjectMapper(); >>> long now = System.currentTimeMillis(); >>> //Configuration conf = HBaseConfiguration.create(); >>> //TenantDao dao = new TenantDao(conf); >>> //ArrayList puts = new ArrayList<>(); >>> if (tenant != null) { >>> //Tenant tenant = tenant.next(); >>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>> producer.send(new ProducerRecord>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>> tenant.getVolumeName(), gson.toJson(tenant))); >>> //puts.add(dao.mkPut(tenant)); >>> } else { >>> producer.send(new ProducerRecord>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>> tenant.getVolumeName(), gson.toJson(tenant))); >>> >>> } >>> producer.flush(); >>> } >>> } >>> >>> >>> After building the application, it throws error during launch: >>> >>> An error occurred trying to launch the application. Server message: >>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >>> java.lang.Class.getDeclaredFields0(Native Method) at >>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>> >>> >>> Thanks >>> Jaspal >>> >>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>> jaspal.singh1...@gmail.com> wrote: >>> Thomas, I was trying to refer to the input from previous operator. Another thing when we extend the AbstractKafkaOutpu
Re: Datatorrent fault tolerance
So I just changes the malhar-kafka version to 3.5.0, I was able to import the AbstractOutputOperator. Let me try to launch it now. Thanks for your inputs !! On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh wrote: > Should we use malhar-library version 3.5 then ? > > > Thanks!! > > On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise wrote: > >> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. >> This operator is not in malhar-library, it's a separate module. >> >> >> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh > > wrote: >> >>> Hi Siyuan, >>> >>> I am using the same Kafka producer as you mentioned. But I am not seeing >>> the AbstractKafkaOutputOperator in malhar library while import. >>> >>> >>> Thanks!! >>> >>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com >>> wrote: >>> Also which kafka output operator you are using? Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with MapR stream, the latter only works with kafka 0.8.* or 0.9 Regards, Siyuan On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com wrote: > Hey Jaspal, > > Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator > from malhar? If so please make sure the producer you use here is > org.apache.kafka.clients.producer.KafkaProducer instead of > kafka.javaapi.producer.Producer. That is old api and that is not > supported by MapR stream. > > > Regards, > Siyuan > > On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < > jaspal.singh1...@gmail.com> wrote: > >> Thomas, >> >> Below is the operator implementation we are trying to run. This >> operator is getting an object of tenant class from updtream operator. >> >> public class KafkaSinglePortExactlyOnceOutputOperator extends >> AbstractKafkaOutputOperator { >> >> private static final Logger LOG = >> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >> >> public transient final DefaultInputPort in = new >> DefaultInputPort() { >> >> Gson gson = new Gson(); >> >> @Override >> public void process(Tenant tenant) { >> >> try { >> Producer producer = getKafkaProducer(); >> //ObjectMapper mapper = new ObjectMapper(); >> long now = System.currentTimeMillis(); >> //Configuration conf = HBaseConfiguration.create(); >> //TenantDao dao = new TenantDao(conf); >> //ArrayList puts = new ArrayList<>(); >> if (tenant != null) { >> //Tenant tenant = tenant.next(); >> if (StringUtils.isNotEmpty(tenant.getGl())) { >> producer.send(new ProducerRecord> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >> tenant.getVolumeName(), gson.toJson(tenant))); >> //puts.add(dao.mkPut(tenant)); >> } else { >> producer.send(new ProducerRecord> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >> tenant.getVolumeName(), gson.toJson(tenant))); >> >> } >> producer.flush(); >> } >> } >> >> >> After building the application, it throws error during launch: >> >> An error occurred trying to launch the application. Server message: >> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >> java.lang.Class.getDeclaredFields0(Native Method) at >> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >> java.lang.Class.getDeclaredFields(Class.java:1916) at >> >> >> Thanks >> Jaspal >> >> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >> jaspal.singh1...@gmail.com> wrote: >> >>> Thomas, >>> >>> I was trying to refer to the input from previous operator. >>> >>> Another thing when we extend the AbstractKafkaOutputOperator, do we >>> need to specify ? Since we are getting an object of class >>> type >>> from previous operator. >>> >>> >>> Thanks >>> Jaspal >>> >>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise >>> wrote: >>> Are you referring to the upstream operator in the DAG or the state of the previous application after relaunch? Since the data is stored in MapR streams, an operator that is a producer can also act as a consumer. Please clarify your question. On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >
Re: Datatorrent fault tolerance
Should we use malhar-library version 3.5 then ? Thanks!! On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise wrote: > Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. This > operator is not in malhar-library, it's a separate module. > > > On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh > wrote: > >> Hi Siyuan, >> >> I am using the same Kafka producer as you mentioned. But I am not seeing >> the AbstractKafkaOutputOperator in malhar library while import. >> >> >> Thanks!! >> >> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com >> wrote: >> >>> Also which kafka output operator you are using? >>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead >>> of com.datatorrent.contrib.kafka.AbstractOutputOperator. >>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with >>> MapR stream, the latter only works with kafka 0.8.* or 0.9 >>> >>> Regards, >>> Siyuan >>> >>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com >>> wrote: >>> Hey Jaspal, Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator from malhar? If so please make sure the producer you use here is org.apache.kafka.clients.producer.KafkaProducer instead of kafka.javaapi.producer.Producer. That is old api and that is not supported by MapR stream. Regards, Siyuan On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > Thomas, > > Below is the operator implementation we are trying to run. This > operator is getting an object of tenant class from updtream operator. > > public class KafkaSinglePortExactlyOnceOutputOperator extends > AbstractKafkaOutputOperator { > > private static final Logger LOG = > LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); > > public transient final DefaultInputPort in = new > DefaultInputPort() { > > Gson gson = new Gson(); > > @Override > public void process(Tenant tenant) { > > try { > Producer producer = getKafkaProducer(); > //ObjectMapper mapper = new ObjectMapper(); > long now = System.currentTimeMillis(); > //Configuration conf = HBaseConfiguration.create(); > //TenantDao dao = new TenantDao(conf); > //ArrayList puts = new ArrayList<>(); > if (tenant != null) { > //Tenant tenant = tenant.next(); > if (StringUtils.isNotEmpty(tenant.getGl())) { > producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", > tenant.getVolumeName(), gson.toJson(tenant))); > //puts.add(dao.mkPut(tenant)); > } else { > producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", > tenant.getVolumeName(), gson.toJson(tenant))); > > } > producer.flush(); > } > } > > > After building the application, it throws error during launch: > > An error occurred trying to launch the application. Server message: > java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at > java.lang.Class.getDeclaredFields0(Native Method) at > java.lang.Class.privateGetDeclaredFields(Class.java:2583) at > java.lang.Class.getDeclaredFields(Class.java:1916) at > > > Thanks > Jaspal > > On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < > jaspal.singh1...@gmail.com> wrote: > >> Thomas, >> >> I was trying to refer to the input from previous operator. >> >> Another thing when we extend the AbstractKafkaOutputOperator, do we >> need to specify ? Since we are getting an object of class >> type >> from previous operator. >> >> >> Thanks >> Jaspal >> >> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise wrote: >> >>> Are you referring to the upstream operator in the DAG or the state >>> of the previous application after relaunch? Since the data is stored in >>> MapR streams, an operator that is a producer can also act as a consumer. >>> Please clarify your question. >>> >>> >>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >>> jaspal.singh1...@gmail.com> wrote: >>> Hi Thomas, I have a question, so when we are using *KafkaSinglePortExactlyOnceOutputOperator* to write results into maprstream topic will it be able to read messgaes from the previous operator ? Thanks Jaspal On
Re: Datatorrent fault tolerance
Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. This operator is not in malhar-library, it's a separate module. On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh wrote: > Hi Siyuan, > > I am using the same Kafka producer as you mentioned. But I am not seeing > the AbstractKafkaOutputOperator in malhar library while import. > > > Thanks!! > > On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com > wrote: > >> Also which kafka output operator you are using? >> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead >> of com.datatorrent.contrib.kafka.AbstractOutputOperator. >> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with >> MapR stream, the latter only works with kafka 0.8.* or 0.9 >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com >> wrote: >> >>> Hey Jaspal, >>> >>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator >>> from malhar? If so please make sure the producer you use here is >>> org.apache.kafka.clients.producer.KafkaProducer instead of >>> kafka.javaapi.producer.Producer. That is old api and that is not >>> supported by MapR stream. >>> >>> >>> Regards, >>> Siyuan >>> >>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh >> > wrote: >>> Thomas, Below is the operator implementation we are trying to run. This operator is getting an object of tenant class from updtream operator. public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator { private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); public transient final DefaultInputPort in = new DefaultInputPort() { Gson gson = new Gson(); @Override public void process(Tenant tenant) { try { Producer producer = getKafkaProducer(); //ObjectMapper mapper = new ObjectMapper(); long now = System.currentTimeMillis(); //Configuration conf = HBaseConfiguration.create(); //TenantDao dao = new TenantDao(conf); //ArrayList puts = new ArrayList<>(); if (tenant != null) { //Tenant tenant = tenant.next(); if (StringUtils.isNotEmpty(tenant.getGl())) { producer.send(new ProducerRecord>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant))); //puts.add(dao.mkPut(tenant)); } else { producer.send(new ProducerRecord>>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant))); } producer.flush(); } } After building the application, it throws error during launch: An error occurred trying to launch the application. Server message: java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2583) at java.lang.Class.getDeclaredFields(Class.java:1916) at Thanks Jaspal On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > Thomas, > > I was trying to refer to the input from previous operator. > > Another thing when we extend the AbstractKafkaOutputOperator, do we > need to specify ? Since we are getting an object of class type > from previous operator. > > > Thanks > Jaspal > > On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise wrote: > >> Are you referring to the upstream operator in the DAG or the state of >> the previous application after relaunch? Since the data is stored in MapR >> streams, an operator that is a producer can also act as a consumer. >> Please >> clarify your question. >> >> >> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >> jaspal.singh1...@gmail.com> wrote: >> >>> Hi Thomas, >>> >>> I have a question, so when we are using >>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into >>> maprstream topic will it be able to read messgaes from the previous >>> operator ? >>> >>> >>> Thanks >>> Jaspal >>> >>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise wrote: >>> For recovery you need to set the window data manager like so: https://github.com/DataTorrent/examples/blob/master/tutorial s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>>
Re: Datatorrent fault tolerance
Hi Siyuan, I am using the same Kafka producer as you mentioned. But I am not seeing the AbstractKafkaOutputOperator in malhar library while import. Thanks!! On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com wrote: > Also which kafka output operator you are using? > Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead > of com.datatorrent.contrib.kafka.AbstractOutputOperator. > Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with > MapR stream, the latter only works with kafka 0.8.* or 0.9 > > Regards, > Siyuan > > On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com > wrote: > >> Hey Jaspal, >> >> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator >> from malhar? If so please make sure the producer you use here is >> org.apache.kafka.clients.producer.KafkaProducer instead of >> kafka.javaapi.producer.Producer. That is old api and that is not >> supported by MapR stream. >> >> >> Regards, >> Siyuan >> >> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh >> wrote: >> >>> Thomas, >>> >>> Below is the operator implementation we are trying to run. This operator >>> is getting an object of tenant class from updtream operator. >>> >>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>> AbstractKafkaOutputOperator { >>> >>> private static final Logger LOG = >>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>> >>> public transient final DefaultInputPort in = new >>> DefaultInputPort() { >>> >>> Gson gson = new Gson(); >>> >>> @Override >>> public void process(Tenant tenant) { >>> >>> try { >>> Producer producer = getKafkaProducer(); >>> //ObjectMapper mapper = new ObjectMapper(); >>> long now = System.currentTimeMillis(); >>> //Configuration conf = HBaseConfiguration.create(); >>> //TenantDao dao = new TenantDao(conf); >>> //ArrayList puts = new ArrayList<>(); >>> if (tenant != null) { >>> //Tenant tenant = tenant.next(); >>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>> producer.send(new ProducerRecord>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>> tenant.getVolumeName(), gson.toJson(tenant))); >>> //puts.add(dao.mkPut(tenant)); >>> } else { >>> producer.send(new ProducerRecord>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>> tenant.getVolumeName(), gson.toJson(tenant))); >>> >>> } >>> producer.flush(); >>> } >>> } >>> >>> >>> After building the application, it throws error during launch: >>> >>> An error occurred trying to launch the application. Server message: >>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >>> java.lang.Class.getDeclaredFields0(Native Method) at >>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>> >>> >>> Thanks >>> Jaspal >>> >>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>> jaspal.singh1...@gmail.com> wrote: >>> Thomas, I was trying to refer to the input from previous operator. Another thing when we extend the AbstractKafkaOutputOperator, do we need to specify ? Since we are getting an object of class type from previous operator. Thanks Jaspal On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise wrote: > Are you referring to the upstream operator in the DAG or the state of > the previous application after relaunch? Since the data is stored in MapR > streams, an operator that is a producer can also act as a consumer. Please > clarify your question. > > > On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < > jaspal.singh1...@gmail.com> wrote: > >> Hi Thomas, >> >> I have a question, so when we are using >> *KafkaSinglePortExactlyOnceOutputOperator* to write results into >> maprstream topic will it be able to read messgaes from the previous >> operator ? >> >> >> Thanks >> Jaspal >> >> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise wrote: >> >>> For recovery you need to set the window data manager like so: >>> >>> https://github.com/DataTorrent/examples/blob/master/tutorial >>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>> on.java#L33 >>> >>> That will also apply to stateful restart of the entire application >>> (relaunch from previous instance's checkpointed state). >>> >>> For cold restart, you would need to consider the property you >>> mention and decide what is applicable to your use case. >>> >>> Thomas >>> >>> >
Re: Datatorrent fault tolerance
Also which kafka output operator you are using? Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead of com.datatorrent.contrib.kafka.AbstractOutputOperator. Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with MapR stream, the latter only works with kafka 0.8.* or 0.9 Regards, Siyuan On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com wrote: > Hey Jaspal, > > Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator from > malhar? If so please make sure the producer you use here > is org.apache.kafka.clients.producer.KafkaProducer instead of > kafka.javaapi.producer.Producer. That is old api and that is not > supported by MapR stream. > > > Regards, > Siyuan > > On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh > wrote: > >> Thomas, >> >> Below is the operator implementation we are trying to run. This operator >> is getting an object of tenant class from updtream operator. >> >> public class KafkaSinglePortExactlyOnceOutputOperator extends >> AbstractKafkaOutputOperator { >> >> private static final Logger LOG = >> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >> >> public transient final DefaultInputPort in = new >> DefaultInputPort() { >> >> Gson gson = new Gson(); >> >> @Override >> public void process(Tenant tenant) { >> >> try { >> Producer producer = getKafkaProducer(); >> //ObjectMapper mapper = new ObjectMapper(); >> long now = System.currentTimeMillis(); >> //Configuration conf = HBaseConfiguration.create(); >> //TenantDao dao = new TenantDao(conf); >> //ArrayList puts = new ArrayList<>(); >> if (tenant != null) { >> //Tenant tenant = tenant.next(); >> if (StringUtils.isNotEmpty(tenant.getGl())) { >> producer.send(new ProducerRecord> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >> tenant.getVolumeName(), gson.toJson(tenant))); >> //puts.add(dao.mkPut(tenant)); >> } else { >> producer.send(new ProducerRecord> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >> tenant.getVolumeName(), gson.toJson(tenant))); >> >> } >> producer.flush(); >> } >> } >> >> >> After building the application, it throws error during launch: >> >> An error occurred trying to launch the application. Server message: >> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >> java.lang.Class.getDeclaredFields0(Native Method) at >> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >> java.lang.Class.getDeclaredFields(Class.java:1916) at >> >> >> Thanks >> Jaspal >> >> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh > > wrote: >> >>> Thomas, >>> >>> I was trying to refer to the input from previous operator. >>> >>> Another thing when we extend the AbstractKafkaOutputOperator, do we need >>> to specify ? Since we are getting an object of class type from >>> previous operator. >>> >>> >>> Thanks >>> Jaspal >>> >>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise wrote: >>> Are you referring to the upstream operator in the DAG or the state of the previous application after relaunch? Since the data is stored in MapR streams, an operator that is a producer can also act as a consumer. Please clarify your question. On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > Hi Thomas, > > I have a question, so when we are using > *KafkaSinglePortExactlyOnceOutputOperator* to write results into > maprstream topic will it be able to read messgaes from the previous > operator ? > > > Thanks > Jaspal > > On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise wrote: > >> For recovery you need to set the window data manager like so: >> >> https://github.com/DataTorrent/examples/blob/master/tutorial >> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >> on.java#L33 >> >> That will also apply to stateful restart of the entire application >> (relaunch from previous instance's checkpointed state). >> >> For cold restart, you would need to consider the property you mention >> and decide what is applicable to your use case. >> >> Thomas >> >> >> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >> jaspal.singh1...@gmail.com> wrote: >> >>> Ok now I get it. Thanks for the nice explaination !! >>> >>> One more thing, so you mentioned about checkpointing the offset >>> ranges to replay in same order from kafka. >>> >>> Is there any property we need to configure to do that? like >>> initialOffset set
Re: Datatorrent fault tolerance
Hey Jaspal, Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator from malhar? If so please make sure the producer you use here is org.apache.kafka.clients.producer.KafkaProducer instead of kafka.javaapi.producer.Producer. That is old api and that is not supported by MapR stream. Regards, Siyuan On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh wrote: > Thomas, > > Below is the operator implementation we are trying to run. This operator > is getting an object of tenant class from updtream operator. > > public class KafkaSinglePortExactlyOnceOutputOperator extends > AbstractKafkaOutputOperator { > > private static final Logger LOG = > LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); > > public transient final DefaultInputPort in = new > DefaultInputPort() { > > Gson gson = new Gson(); > > @Override > public void process(Tenant tenant) { > > try { > Producer producer = getKafkaProducer(); > //ObjectMapper mapper = new ObjectMapper(); > long now = System.currentTimeMillis(); > //Configuration conf = HBaseConfiguration.create(); > //TenantDao dao = new TenantDao(conf); > //ArrayList puts = new ArrayList<>(); > if (tenant != null) { > //Tenant tenant = tenant.next(); > if (StringUtils.isNotEmpty(tenant.getGl())) { > producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", > tenant.getVolumeName(), gson.toJson(tenant))); > //puts.add(dao.mkPut(tenant)); > } else { > producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", > tenant.getVolumeName(), gson.toJson(tenant))); > > } > producer.flush(); > } > } > > > After building the application, it throws error during launch: > > An error occurred trying to launch the application. Server message: > java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at > java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class. > privateGetDeclaredFields(Class.java:2583) at java.lang.Class. > getDeclaredFields(Class.java:1916) at > > > Thanks > Jaspal > > On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh > wrote: > >> Thomas, >> >> I was trying to refer to the input from previous operator. >> >> Another thing when we extend the AbstractKafkaOutputOperator, do we need >> to specify ? Since we are getting an object of class type from >> previous operator. >> >> >> Thanks >> Jaspal >> >> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise wrote: >> >>> Are you referring to the upstream operator in the DAG or the state of >>> the previous application after relaunch? Since the data is stored in MapR >>> streams, an operator that is a producer can also act as a consumer. Please >>> clarify your question. >>> >>> >>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh >> > wrote: >>> Hi Thomas, I have a question, so when we are using *KafkaSinglePortExactlyOnceOutputOperator* to write results into maprstream topic will it be able to read messgaes from the previous operator ? Thanks Jaspal On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise wrote: > For recovery you need to set the window data manager like so: > > https://github.com/DataTorrent/examples/blob/master/tutorial > s/exactly-once/src/main/java/com/example/myapexapp/Applicati > on.java#L33 > > That will also apply to stateful restart of the entire application > (relaunch from previous instance's checkpointed state). > > For cold restart, you would need to consider the property you mention > and decide what is applicable to your use case. > > Thomas > > > On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < > jaspal.singh1...@gmail.com> wrote: > >> Ok now I get it. Thanks for the nice explaination !! >> >> One more thing, so you mentioned about checkpointing the offset >> ranges to replay in same order from kafka. >> >> Is there any property we need to configure to do that? like >> initialOffset set to APPLICATION_OR_LATEST. >> >> >> Thanks >> Jaspal >> >> >> On Thursday, October 6, 2016, Thomas Weise >> wrote: >> >>> What you want is the effect of exactly-once output (that's why we >>> call it also end-to-end exactly-once). There is no such thing as >>> exactly-once processing in a distributed system. In this case it would >>> be >>> rather "produce exactly-once. Upstream operators, on failure, will >>> recover >>> to checkpointed state and re-process the stream from there. Thi
Re: Datatorrent fault tolerance
Please post the following: 1. Entire pom.xml 2. output of "mvn dependency:tree" 3. output of "jar tvf " run on your application package file (with .apa extension) Ram On Fri, Oct 7, 2016 at 10:10 AM, Jaspal Singh wrote: > Thomas, > > We have added the dependency in pom.xml for lafka client API and also for > malhar kafka. Please highlight if you are specifying some other dependency > that we need to add. > > > org.apache.apex > malhar-kafka > ${malhar.version} > > > org.apache.kafka > kafka-clients > > > > > > org.apache.kafka > kafka-clients > 0.9.0.0-mapr-1602-streams-5.1.0 > > > > > Thanks!! > > On Fri, Oct 7, 2016 at 12:04 PM, Thomas Weise wrote: > >> It looks like the Kafka API dependency is missing. Can you please check >> it is part of the .apa file? >> >> To your previous question: The records/tuples/objects are moved by the >> Apex engine through the stream from operator to operator. There is nothing >> you need to do beyond connecting the operator ports with addStream when you >> specify the DAG. >> >> >> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh >> wrote: >> >>> Thomas, >>> >>> Below is the operator implementation we are trying to run. This operator >>> is getting an object of tenant class from updtream operator. >>> >>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>> AbstractKafkaOutputOperator { >>> >>> private static final Logger LOG = >>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>> >>> public transient final DefaultInputPort in = new >>> DefaultInputPort() { >>> >>> Gson gson = new Gson(); >>> >>> @Override >>> public void process(Tenant tenant) { >>> >>> try { >>> Producer producer = getKafkaProducer(); >>> //ObjectMapper mapper = new ObjectMapper(); >>> long now = System.currentTimeMillis(); >>> //Configuration conf = HBaseConfiguration.create(); >>> //TenantDao dao = new TenantDao(conf); >>> //ArrayList puts = new ArrayList<>(); >>> if (tenant != null) { >>> //Tenant tenant = tenant.next(); >>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>> producer.send(new ProducerRecord>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>> tenant.getVolumeName(), gson.toJson(tenant))); >>> //puts.add(dao.mkPut(tenant)); >>> } else { >>> producer.send(new ProducerRecord>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>> tenant.getVolumeName(), gson.toJson(tenant))); >>> >>> } >>> producer.flush(); >>> } >>> } >>> >>> >>> After building the application, it throws error during launch: >>> >>> An error occurred trying to launch the application. Server message: >>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >>> java.lang.Class.getDeclaredFields0(Native Method) at >>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>> >>> >>> Thanks >>> Jaspal >>> >>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>> jaspal.singh1...@gmail.com> wrote: >>> Thomas, I was trying to refer to the input from previous operator. Another thing when we extend the AbstractKafkaOutputOperator, do we need to specify ? Since we are getting an object of class type from previous operator. Thanks Jaspal On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise wrote: > Are you referring to the upstream operator in the DAG or the state of > the previous application after relaunch? Since the data is stored in MapR > streams, an operator that is a producer can also act as a consumer. Please > clarify your question. > > > On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < > jaspal.singh1...@gmail.com> wrote: > >> Hi Thomas, >> >> I have a question, so when we are using >> *KafkaSinglePortExactlyOnceOutputOperator* to write results into >> maprstream topic will it be able to read messgaes from the previous >> operator ? >> >> >> Thanks >> Jaspal >> >> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise wrote: >> >>> For recovery you need to set the window data manager like so: >>> >>> https://github.com/DataTorrent/examples/blob/master/tutorial >>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>> on.java#L33 >>> >>> That will also apply to stateful restart of the entire application >>> (relaunch from previous instance's checkpointed state). >>> >>> For cold restart, you would need to consider the property you >>
Re: Datatorrent fault tolerance
Thomas, We have added the dependency in pom.xml for lafka client API and also for malhar kafka. Please highlight if you are specifying some other dependency that we need to add. org.apache.apex malhar-kafka ${malhar.version} org.apache.kafka kafka-clients org.apache.kafka kafka-clients 0.9.0.0-mapr-1602-streams-5.1.0 Thanks!! On Fri, Oct 7, 2016 at 12:04 PM, Thomas Weise wrote: > It looks like the Kafka API dependency is missing. Can you please check it > is part of the .apa file? > > To your previous question: The records/tuples/objects are moved by the > Apex engine through the stream from operator to operator. There is nothing > you need to do beyond connecting the operator ports with addStream when you > specify the DAG. > > > On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh > wrote: > >> Thomas, >> >> Below is the operator implementation we are trying to run. This operator >> is getting an object of tenant class from updtream operator. >> >> public class KafkaSinglePortExactlyOnceOutputOperator extends >> AbstractKafkaOutputOperator { >> >> private static final Logger LOG = >> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >> >> public transient final DefaultInputPort in = new >> DefaultInputPort() { >> >> Gson gson = new Gson(); >> >> @Override >> public void process(Tenant tenant) { >> >> try { >> Producer producer = getKafkaProducer(); >> //ObjectMapper mapper = new ObjectMapper(); >> long now = System.currentTimeMillis(); >> //Configuration conf = HBaseConfiguration.create(); >> //TenantDao dao = new TenantDao(conf); >> //ArrayList puts = new ArrayList<>(); >> if (tenant != null) { >> //Tenant tenant = tenant.next(); >> if (StringUtils.isNotEmpty(tenant.getGl())) { >> producer.send(new ProducerRecord> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >> tenant.getVolumeName(), gson.toJson(tenant))); >> //puts.add(dao.mkPut(tenant)); >> } else { >> producer.send(new ProducerRecord> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >> tenant.getVolumeName(), gson.toJson(tenant))); >> >> } >> producer.flush(); >> } >> } >> >> >> After building the application, it throws error during launch: >> >> An error occurred trying to launch the application. Server message: >> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >> java.lang.Class.getDeclaredFields0(Native Method) at >> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >> java.lang.Class.getDeclaredFields(Class.java:1916) at >> >> >> Thanks >> Jaspal >> >> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh > > wrote: >> >>> Thomas, >>> >>> I was trying to refer to the input from previous operator. >>> >>> Another thing when we extend the AbstractKafkaOutputOperator, do we need >>> to specify ? Since we are getting an object of class type from >>> previous operator. >>> >>> >>> Thanks >>> Jaspal >>> >>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise wrote: >>> Are you referring to the upstream operator in the DAG or the state of the previous application after relaunch? Since the data is stored in MapR streams, an operator that is a producer can also act as a consumer. Please clarify your question. On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > Hi Thomas, > > I have a question, so when we are using > *KafkaSinglePortExactlyOnceOutputOperator* to write results into > maprstream topic will it be able to read messgaes from the previous > operator ? > > > Thanks > Jaspal > > On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise wrote: > >> For recovery you need to set the window data manager like so: >> >> https://github.com/DataTorrent/examples/blob/master/tutorial >> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >> on.java#L33 >> >> That will also apply to stateful restart of the entire application >> (relaunch from previous instance's checkpointed state). >> >> For cold restart, you would need to consider the property you mention >> and decide what is applicable to your use case. >> >> Thomas >> >> >> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >> jaspal.singh1...@gmail.com> wrote: >> >>> Ok now I get it. Thanks for the nice explaination !! >>> >>> One more thing, so you mentioned about checkpointing the offset >>> ranges to replay in same order from kafka. >>> >>> Is there any propert
Re: Datatorrent fault tolerance
It looks like the Kafka API dependency is missing. Can you please check it is part of the .apa file? To your previous question: The records/tuples/objects are moved by the Apex engine through the stream from operator to operator. There is nothing you need to do beyond connecting the operator ports with addStream when you specify the DAG. On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh wrote: > Thomas, > > Below is the operator implementation we are trying to run. This operator > is getting an object of tenant class from updtream operator. > > public class KafkaSinglePortExactlyOnceOutputOperator extends > AbstractKafkaOutputOperator { > > private static final Logger LOG = > LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); > > public transient final DefaultInputPort in = new > DefaultInputPort() { > > Gson gson = new Gson(); > > @Override > public void process(Tenant tenant) { > > try { > Producer producer = getKafkaProducer(); > //ObjectMapper mapper = new ObjectMapper(); > long now = System.currentTimeMillis(); > //Configuration conf = HBaseConfiguration.create(); > //TenantDao dao = new TenantDao(conf); > //ArrayList puts = new ArrayList<>(); > if (tenant != null) { > //Tenant tenant = tenant.next(); > if (StringUtils.isNotEmpty(tenant.getGl())) { > producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", > tenant.getVolumeName(), gson.toJson(tenant))); > //puts.add(dao.mkPut(tenant)); > } else { > producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", > tenant.getVolumeName(), gson.toJson(tenant))); > > } > producer.flush(); > } > } > > > After building the application, it throws error during launch: > > An error occurred trying to launch the application. Server message: > java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at > java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class. > privateGetDeclaredFields(Class.java:2583) at java.lang.Class. > getDeclaredFields(Class.java:1916) at > > > Thanks > Jaspal > > On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh > wrote: > >> Thomas, >> >> I was trying to refer to the input from previous operator. >> >> Another thing when we extend the AbstractKafkaOutputOperator, do we need >> to specify ? Since we are getting an object of class type from >> previous operator. >> >> >> Thanks >> Jaspal >> >> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise wrote: >> >>> Are you referring to the upstream operator in the DAG or the state of >>> the previous application after relaunch? Since the data is stored in MapR >>> streams, an operator that is a producer can also act as a consumer. Please >>> clarify your question. >>> >>> >>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh >> > wrote: >>> Hi Thomas, I have a question, so when we are using *KafkaSinglePortExactlyOnceOutputOperator* to write results into maprstream topic will it be able to read messgaes from the previous operator ? Thanks Jaspal On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise wrote: > For recovery you need to set the window data manager like so: > > https://github.com/DataTorrent/examples/blob/master/tutorial > s/exactly-once/src/main/java/com/example/myapexapp/Applicati > on.java#L33 > > That will also apply to stateful restart of the entire application > (relaunch from previous instance's checkpointed state). > > For cold restart, you would need to consider the property you mention > and decide what is applicable to your use case. > > Thomas > > > On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < > jaspal.singh1...@gmail.com> wrote: > >> Ok now I get it. Thanks for the nice explaination !! >> >> One more thing, so you mentioned about checkpointing the offset >> ranges to replay in same order from kafka. >> >> Is there any property we need to configure to do that? like >> initialOffset set to APPLICATION_OR_LATEST. >> >> >> Thanks >> Jaspal >> >> >> On Thursday, October 6, 2016, Thomas Weise >> wrote: >> >>> What you want is the effect of exactly-once output (that's why we >>> call it also end-to-end exactly-once). There is no such thing as >>> exactly-once processing in a distributed system. In this case it would >>> be >>> rather "produce exactly-once. Upstream operators, on failure, will >>> recover >>> to checkpointed state and re-process the st
Re: Datatorrent fault tolerance
Thomas, Below is the operator implementation we are trying to run. This operator is getting an object of tenant class from updtream operator. public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator { private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); public transient final DefaultInputPort in = new DefaultInputPort() { Gson gson = new Gson(); @Override public void process(Tenant tenant) { try { Producer producer = getKafkaProducer(); //ObjectMapper mapper = new ObjectMapper(); long now = System.currentTimeMillis(); //Configuration conf = HBaseConfiguration.create(); //TenantDao dao = new TenantDao(conf); //ArrayList puts = new ArrayList<>(); if (tenant != null) { //Tenant tenant = tenant.next(); if (StringUtils.isNotEmpty(tenant.getGl())) { producer.send(new ProducerRecord("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant))); //puts.add(dao.mkPut(tenant)); } else { producer.send(new ProducerRecord("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant))); } producer.flush(); } } After building the application, it throws error during launch: An error occurred trying to launch the application. Server message: java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2583) at java.lang.Class.getDeclaredFields(Class.java:1916) at Thanks Jaspal On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh wrote: > Thomas, > > I was trying to refer to the input from previous operator. > > Another thing when we extend the AbstractKafkaOutputOperator, do we need > to specify ? Since we are getting an object of class type from > previous operator. > > > Thanks > Jaspal > > On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise wrote: > >> Are you referring to the upstream operator in the DAG or the state of the >> previous application after relaunch? Since the data is stored in MapR >> streams, an operator that is a producer can also act as a consumer. Please >> clarify your question. >> >> >> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh >> wrote: >> >>> Hi Thomas, >>> >>> I have a question, so when we are using >>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into >>> maprstream topic will it be able to read messgaes from the previous >>> operator ? >>> >>> >>> Thanks >>> Jaspal >>> >>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise wrote: >>> For recovery you need to set the window data manager like so: https://github.com/DataTorrent/examples/blob/master/tutorial s/exactly-once/src/main/java/com/example/myapexapp/Application.java#L33 That will also apply to stateful restart of the entire application (relaunch from previous instance's checkpointed state). For cold restart, you would need to consider the property you mention and decide what is applicable to your use case. Thomas On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > Ok now I get it. Thanks for the nice explaination !! > > One more thing, so you mentioned about checkpointing the offset ranges > to replay in same order from kafka. > > Is there any property we need to configure to do that? like > initialOffset set to APPLICATION_OR_LATEST. > > > Thanks > Jaspal > > > On Thursday, October 6, 2016, Thomas Weise > wrote: > >> What you want is the effect of exactly-once output (that's why we >> call it also end-to-end exactly-once). There is no such thing as >> exactly-once processing in a distributed system. In this case it would be >> rather "produce exactly-once. Upstream operators, on failure, will >> recover >> to checkpointed state and re-process the stream from there. This is >> at-least-once, the default behavior. Because in the input operator you >> have >> configured to replay in the same order from Kafka (this is done by >> checkpointing the offset ranges), the computation in the DAG is >> idempotent >> and the output operator can discard the results that were already >> published >> instead of producing duplicates. >> >> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >> jaspal.singh1...@gmail.com> wrote: >> >>> I think this is something called a customized operator >>> implementation tha
Re: Datatorrent fault tolerance
Thomas, I was trying to refer to the input from previous operator. Another thing when we extend the AbstractKafkaOutputOperator, do we need to specify ? Since we are getting an object of class type from previous operator. Thanks Jaspal On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise wrote: > Are you referring to the upstream operator in the DAG or the state of the > previous application after relaunch? Since the data is stored in MapR > streams, an operator that is a producer can also act as a consumer. Please > clarify your question. > > > On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh > wrote: > >> Hi Thomas, >> >> I have a question, so when we are using >> *KafkaSinglePortExactlyOnceOutputOperator* to write results into >> maprstream topic will it be able to read messgaes from the previous >> operator ? >> >> >> Thanks >> Jaspal >> >> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise wrote: >> >>> For recovery you need to set the window data manager like so: >>> >>> https://github.com/DataTorrent/examples/blob/master/tutorial >>> s/exactly-once/src/main/java/com/example/myapexapp/Application.java#L33 >>> >>> That will also apply to stateful restart of the entire application >>> (relaunch from previous instance's checkpointed state). >>> >>> For cold restart, you would need to consider the property you mention >>> and decide what is applicable to your use case. >>> >>> Thomas >>> >>> >>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh >> > wrote: >>> Ok now I get it. Thanks for the nice explaination !! One more thing, so you mentioned about checkpointing the offset ranges to replay in same order from kafka. Is there any property we need to configure to do that? like initialOffset set to APPLICATION_OR_LATEST. Thanks Jaspal On Thursday, October 6, 2016, Thomas Weise wrote: > What you want is the effect of exactly-once output (that's why we call > it also end-to-end exactly-once). There is no such thing as exactly-once > processing in a distributed system. In this case it would be rather > "produce exactly-once. Upstream operators, on failure, will recover to > checkpointed state and re-process the stream from there. This is > at-least-once, the default behavior. Because in the input operator you > have > configured to replay in the same order from Kafka (this is done by > checkpointing the offset ranges), the computation in the DAG is idempotent > and the output operator can discard the results that were already > published > instead of producing duplicates. > > On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < > jaspal.singh1...@gmail.com> wrote: > >> I think this is something called a customized operator implementation >> that is taking care of exactly once processing at output. >> >> What if any previous operators fail ? How we can make sure they also >> recover using EXACTLY_ONCE processing mode ? >> >> >> Thanks >> Jaspal >> >> >> On Thursday, October 6, 2016, Thomas Weise >> wrote: >> >>> In that case please have a look at: >>> >>> https://github.com/apache/apex-malhar/blob/master/kafka/src/ >>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl >>> yOnceOutputOperator.java >>> >>> The operator will ensure that messages are not duplicated, under the >>> stated assumptions. >>> >>> >>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>> jaspal.singh1...@gmail.com> wrote: >>> Hi Thomas, In our case we are writing the results back to maprstreams topic based on some validations. Thanks Jaspal On Thursday, October 6, 2016, Thomas Weise wrote: > Hi, > > which operators in your application are writing to external > systems? > > When you look at the example from the blog ( > https://github.com/DataTorrent/examples/tree/master/tutoria > ls/exactly-once), there is Kafka input, which is configured to be > idempotent. The results are written to JDBC. That operator by itself > supports exactly-once through transactions (in conjunction with > idempotent > input), hence there is no need to configure the processing mode at > all. > > Thomas > > > >>> >> >
Re: Datatorrent fault tolerance
Are you referring to the upstream operator in the DAG or the state of the previous application after relaunch? Since the data is stored in MapR streams, an operator that is a producer can also act as a consumer. Please clarify your question. On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh wrote: > Hi Thomas, > > I have a question, so when we are using > *KafkaSinglePortExactlyOnceOutputOperator* to write results into > maprstream topic will it be able to read messgaes from the previous > operator ? > > > Thanks > Jaspal > > On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise wrote: > >> For recovery you need to set the window data manager like so: >> >> https://github.com/DataTorrent/examples/blob/master/ >> tutorials/exactly-once/src/main/java/com/example/myapexap >> p/Application.java#L33 >> >> That will also apply to stateful restart of the entire application >> (relaunch from previous instance's checkpointed state). >> >> For cold restart, you would need to consider the property you mention and >> decide what is applicable to your use case. >> >> Thomas >> >> >> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh >> wrote: >> >>> Ok now I get it. Thanks for the nice explaination !! >>> >>> One more thing, so you mentioned about checkpointing the offset ranges >>> to replay in same order from kafka. >>> >>> Is there any property we need to configure to do that? like >>> initialOffset set to APPLICATION_OR_LATEST. >>> >>> >>> Thanks >>> Jaspal >>> >>> >>> On Thursday, October 6, 2016, Thomas Weise >>> wrote: >>> What you want is the effect of exactly-once output (that's why we call it also end-to-end exactly-once). There is no such thing as exactly-once processing in a distributed system. In this case it would be rather "produce exactly-once. Upstream operators, on failure, will recover to checkpointed state and re-process the stream from there. This is at-least-once, the default behavior. Because in the input operator you have configured to replay in the same order from Kafka (this is done by checkpointing the offset ranges), the computation in the DAG is idempotent and the output operator can discard the results that were already published instead of producing duplicates. On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < jaspal.singh1...@gmail.com> wrote: > I think this is something called a customized operator implementation > that is taking care of exactly once processing at output. > > What if any previous operators fail ? How we can make sure they also > recover using EXACTLY_ONCE processing mode ? > > > Thanks > Jaspal > > > On Thursday, October 6, 2016, Thomas Weise > wrote: > >> In that case please have a look at: >> >> https://github.com/apache/apex-malhar/blob/master/kafka/src/ >> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl >> yOnceOutputOperator.java >> >> The operator will ensure that messages are not duplicated, under the >> stated assumptions. >> >> >> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >> jaspal.singh1...@gmail.com> wrote: >> >>> Hi Thomas, >>> >>> In our case we are writing the results back to maprstreams topic based >>> on some validations. >>> >>> >>> Thanks >>> Jaspal >>> >>> >>> On Thursday, October 6, 2016, Thomas Weise wrote: >>> Hi, which operators in your application are writing to external systems? When you look at the example from the blog ( https://github.com/DataTorrent/examples/tree/master/tutoria ls/exactly-once), there is Kafka input, which is configured to be idempotent. The results are written to JDBC. That operator by itself supports exactly-once through transactions (in conjunction with idempotent input), hence there is no need to configure the processing mode at all. Thomas >> >
Re: Datatorrent fault tolerance
The KafkaSinglePortExactlyOnceOutputOperator takes whatever output from previous operator and writes to Kafka. Sent from my iPhone > On Oct 7, 2016, at 07:59, Jaspal Singh wrote: > > Hi Thomas, > > I have a question, so when we are using > KafkaSinglePortExactlyOnceOutputOperator to write results into maprstream > topic will it be able to read messgaes from the previous operator ? > > > Thanks > Jaspal > >> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise wrote: >> For recovery you need to set the window data manager like so: >> >> https://github.com/DataTorrent/examples/blob/master/tutorials/exactly-once/src/main/java/com/example/myapexapp/Application.java#L33 >> >> That will also apply to stateful restart of the entire application (relaunch >> from previous instance's checkpointed state). >> >> For cold restart, you would need to consider the property you mention and >> decide what is applicable to your use case. >> >> Thomas >> >> >>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh >>> wrote: >>> Ok now I get it. Thanks for the nice explaination !! >>> >>> One more thing, so you mentioned about checkpointing the offset ranges to >>> replay in same order from kafka. >>> >>> Is there any property we need to configure to do that? like initialOffset >>> set to APPLICATION_OR_LATEST. >>> >>> >>> Thanks >>> Jaspal >>> >>> On Thursday, October 6, 2016, Thomas Weise wrote: What you want is the effect of exactly-once output (that's why we call it also end-to-end exactly-once). There is no such thing as exactly-once processing in a distributed system. In this case it would be rather "produce exactly-once. Upstream operators, on failure, will recover to checkpointed state and re-process the stream from there. This is at-least-once, the default behavior. Because in the input operator you have configured to replay in the same order from Kafka (this is done by checkpointing the offset ranges), the computation in the DAG is idempotent and the output operator can discard the results that were already published instead of producing duplicates. > On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh > wrote: > I think this is something called a customized operator implementation > that is taking care of exactly once processing at output. > > What if any previous operators fail ? How we can make sure they also > recover using EXACTLY_ONCE processing mode ? > > > Thanks > Jaspal > > >> On Thursday, October 6, 2016, Thomas Weise >> wrote: >> In that case please have a look at: >> >> https://github.com/apache/apex-malhar/blob/master/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java >> >> The operator will ensure that messages are not duplicated, under the >> stated assumptions. >> >> >>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh >>> wrote: >>> Hi Thomas, >>> >>> In our case we are writing the results back to maprstreams topic based >>> on some validations. >>> >>> >>> Thanks >>> Jaspal >>> >>> On Thursday, October 6, 2016, Thomas Weise wrote: Hi, which operators in your application are writing to external systems? When you look at the example from the blog (https://github.com/DataTorrent/examples/tree/master/tutorials/exactly-once), there is Kafka input, which is configured to be idempotent. The results are written to JDBC. That operator by itself supports exactly-once through transactions (in conjunction with idempotent input), hence there is no need to configure the processing mode at all. Thomas >> >
Re: Datatorrent fault tolerance
Hi Thomas, I have a question, so when we are using *KafkaSinglePortExactlyOnceOutputOperator* to write results into maprstream topic will it be able to read messgaes from the previous operator ? Thanks Jaspal On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise wrote: > For recovery you need to set the window data manager like so: > > https://github.com/DataTorrent/examples/blob/ > master/tutorials/exactly-once/src/main/java/com/example/ > myapexapp/Application.java#L33 > > That will also apply to stateful restart of the entire application > (relaunch from previous instance's checkpointed state). > > For cold restart, you would need to consider the property you mention and > decide what is applicable to your use case. > > Thomas > > > On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh > wrote: > >> Ok now I get it. Thanks for the nice explaination !! >> >> One more thing, so you mentioned about checkpointing the offset ranges >> to replay in same order from kafka. >> >> Is there any property we need to configure to do that? like initialOffset >> set to APPLICATION_OR_LATEST. >> >> >> Thanks >> Jaspal >> >> >> On Thursday, October 6, 2016, Thomas Weise >> wrote: >> >>> What you want is the effect of exactly-once output (that's why we call >>> it also end-to-end exactly-once). There is no such thing as exactly-once >>> processing in a distributed system. In this case it would be rather >>> "produce exactly-once. Upstream operators, on failure, will recover to >>> checkpointed state and re-process the stream from there. This is >>> at-least-once, the default behavior. Because in the input operator you have >>> configured to replay in the same order from Kafka (this is done by >>> checkpointing the offset ranges), the computation in the DAG is idempotent >>> and the output operator can discard the results that were already published >>> instead of producing duplicates. >>> >>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh >> > wrote: >>> I think this is something called a customized operator implementation that is taking care of exactly once processing at output. What if any previous operators fail ? How we can make sure they also recover using EXACTLY_ONCE processing mode ? Thanks Jaspal On Thursday, October 6, 2016, Thomas Weise wrote: > In that case please have a look at: > > https://github.com/apache/apex-malhar/blob/master/kafka/src/ > main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl > yOnceOutputOperator.java > > The operator will ensure that messages are not duplicated, under the > stated assumptions. > > > On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < > jaspal.singh1...@gmail.com> wrote: > >> Hi Thomas, >> >> In our case we are writing the results back to maprstreams topic based >> on some validations. >> >> >> Thanks >> Jaspal >> >> >> On Thursday, October 6, 2016, Thomas Weise wrote: >> >>> Hi, >>> >>> which operators in your application are writing to external systems? >>> >>> When you look at the example from the blog ( >>> https://github.com/DataTorrent/examples/tree/master/tutoria >>> ls/exactly-once), there is Kafka input, which is configured to be >>> idempotent. The results are written to JDBC. That operator by itself >>> supports exactly-once through transactions (in conjunction with >>> idempotent >>> input), hence there is no need to configure the processing mode at all. >>> >>> Thomas >>> >>> >>> >