Please post the following: 1. Entire pom.xml 2. output of "mvn dependency:tree" 3. output of "jar tvf " run on your application package file (with .apa extension)
Ram On Fri, Oct 7, 2016 at 10:10 AM, Jaspal Singh <jaspal.singh1...@gmail.com> wrote: > Thomas, > > We have added the dependency in pom.xml for lafka client API and also for > malhar kafka. Please highlight if you are specifying some other dependency > that we need to add. > > <dependency> > <groupId>org.apache.apex</groupId> > <artifactId>malhar-kafka</artifactId> > <version>${malhar.version}</version> > <exclusions> > <exclusion> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > </exclusion> > </exclusions> > </dependency> > > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > <version>0.9.0.0-mapr-1602-streams-5.1.0</version> > > </dependency> > > > Thanks!! > > On Fri, Oct 7, 2016 at 12:04 PM, Thomas Weise <t...@apache.org> wrote: > >> It looks like the Kafka API dependency is missing. Can you please check >> it is part of the .apa file? >> >> To your previous question: The records/tuples/objects are moved by the >> Apex engine through the stream from operator to operator. There is nothing >> you need to do beyond connecting the operator ports with addStream when you >> specify the DAG. >> >> >> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <jaspal.singh1...@gmail.com> >> wrote: >> >>> Thomas, >>> >>> Below is the operator implementation we are trying to run. This operator >>> is getting an object of tenant class from updtream operator. >>> >>> public class KafkaSinglePortExactlyOnceOutputOperator extends >>> AbstractKafkaOutputOperator { >>> >>> private static final Logger LOG = >>> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class); >>> >>> public transient final DefaultInputPort<Tenant> in = new >>> DefaultInputPort<Tenant>() { >>> >>> Gson gson = new Gson(); >>> >>> @Override >>> public void process(Tenant tenant) { >>> >>> try { >>> Producer<String, String> producer = getKafkaProducer(); >>> //ObjectMapper mapper = new ObjectMapper(); >>> long now = System.currentTimeMillis(); >>> //Configuration conf = HBaseConfiguration.create(); >>> //TenantDao dao = new TenantDao(conf); >>> //ArrayList<Put> puts = new ArrayList<>(); >>> if (tenant != null) { >>> //Tenant tenant = tenant.next(); >>> if (StringUtils.isNotEmpty(tenant.getGl())) { >>> producer.send(new ProducerRecord<String, >>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", >>> tenant.getVolumeName(), gson.toJson(tenant))); >>> //puts.add(dao.mkPut(tenant)); >>> } else { >>> producer.send(new ProducerRecord<String, >>> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", >>> tenant.getVolumeName(), gson.toJson(tenant))); >>> >>> } >>> producer.flush(); >>> } >>> } >>> >>> >>> After building the application, it throws error during launch: >>> >>> An error occurred trying to launch the application. Server message: >>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at >>> java.lang.Class.getDeclaredFields0(Native Method) at >>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at >>> java.lang.Class.getDeclaredFields(Class.java:1916) at >>> >>> >>> Thanks >>> Jaspal >>> >>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh < >>> jaspal.singh1...@gmail.com> wrote: >>> >>>> Thomas, >>>> >>>> I was trying to refer to the input from previous operator. >>>> >>>> Another thing when we extend the AbstractKafkaOutputOperator, do we >>>> need to specify <String, T> ? Since we are getting an object of class type >>>> from previous operator. >>>> >>>> >>>> Thanks >>>> Jaspal >>>> >>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <t...@apache.org> wrote: >>>> >>>>> Are you referring to the upstream operator in the DAG or the state of >>>>> the previous application after relaunch? Since the data is stored in MapR >>>>> streams, an operator that is a producer can also act as a consumer. Please >>>>> clarify your question. >>>>> >>>>> >>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh < >>>>> jaspal.singh1...@gmail.com> wrote: >>>>> >>>>>> Hi Thomas, >>>>>> >>>>>> I have a question, so when we are using >>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into >>>>>> maprstream topic will it be able to read messgaes from the previous >>>>>> operator ? >>>>>> >>>>>> >>>>>> Thanks >>>>>> Jaspal >>>>>> >>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <t...@apache.org> wrote: >>>>>> >>>>>>> For recovery you need to set the window data manager like so: >>>>>>> >>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial >>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati >>>>>>> on.java#L33 >>>>>>> >>>>>>> That will also apply to stateful restart of the entire application >>>>>>> (relaunch from previous instance's checkpointed state). >>>>>>> >>>>>>> For cold restart, you would need to consider the property you >>>>>>> mention and decide what is applicable to your use case. >>>>>>> >>>>>>> Thomas >>>>>>> >>>>>>> >>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh < >>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>> >>>>>>>> Ok now I get it. Thanks for the nice explaination !! >>>>>>>> >>>>>>>> One more thing, so you mentioned about checkpointing the offset >>>>>>>> ranges to replay in same order from kafka. >>>>>>>> >>>>>>>> Is there any property we need to configure to do that? like >>>>>>>> initialOffset set to APPLICATION_OR_LATEST. >>>>>>>> >>>>>>>> >>>>>>>> Thanks >>>>>>>> Jaspal >>>>>>>> >>>>>>>> >>>>>>>> On Thursday, October 6, 2016, Thomas Weise <thomas.we...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> What you want is the effect of exactly-once output (that's why we >>>>>>>>> call it also end-to-end exactly-once). There is no such thing as >>>>>>>>> exactly-once processing in a distributed system. In this case it >>>>>>>>> would be >>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will >>>>>>>>> recover >>>>>>>>> to checkpointed state and re-process the stream from there. This is >>>>>>>>> at-least-once, the default behavior. Because in the input operator >>>>>>>>> you have >>>>>>>>> configured to replay in the same order from Kafka (this is done by >>>>>>>>> checkpointing the offset ranges), the computation in the DAG is >>>>>>>>> idempotent >>>>>>>>> and the output operator can discard the results that were already >>>>>>>>> published >>>>>>>>> instead of producing duplicates. >>>>>>>>> >>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh < >>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I think this is something called a customized operator >>>>>>>>>> implementation that is taking care of exactly once processing at >>>>>>>>>> output. >>>>>>>>>> >>>>>>>>>> What if any previous operators fail ? How we can make sure they >>>>>>>>>> also recover using EXACTLY_ONCE processing mode ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Jaspal >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise < >>>>>>>>>> thomas.we...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> In that case please have a look at: >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/ >>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl >>>>>>>>>>> yOnceOutputOperator.java >>>>>>>>>>> >>>>>>>>>>> The operator will ensure that messages are not duplicated, under >>>>>>>>>>> the stated assumptions. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh < >>>>>>>>>>> jaspal.singh1...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>> >>>>>>>>>>>> In our case we are writing the results back to >>>>>>>>>>>> maprstreams topic based on some validations. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Jaspal >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <t...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> which operators in your application are writing to external >>>>>>>>>>>>> systems? >>>>>>>>>>>>> >>>>>>>>>>>>> When you look at the example from the blog ( >>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria >>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured >>>>>>>>>>>>> to be idempotent. The results are written to JDBC. That operator >>>>>>>>>>>>> by itself >>>>>>>>>>>>> supports exactly-once through transactions (in conjunction with >>>>>>>>>>>>> idempotent >>>>>>>>>>>>> input), hence there is no need to configure the processing mode >>>>>>>>>>>>> at all. >>>>>>>>>>>>> >>>>>>>>>>>>> Thomas >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >