Since you're producing String data to 'myTopic', can you try setting the string serialization in your config ?
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory systems.kafka.streams.myTopic.samza.msg.serde=string On Mon, Mar 23, 2015 at 9:17 PM, Ash W Matheson <ash.mathe...@gmail.com> wrote: > more info - new exception message: > > Exception in thread "main" > org.apache.samza.system.SystemConsumersException: Cannot deserialize an > incoming message. > > Updated the diff in pastebin with the changes. > > On Mon, Mar 23, 2015 at 8:41 PM, Ash W Matheson <ash.mathe...@gmail.com> > wrote: > > > Gah! Yeah, those were gone several revisions ago but didn't get nuked in > > the last iteration. > > > > OK, let me do a quick test to see if that was my problem all along. > > > > On Mon, Mar 23, 2015 at 8:38 PM, Navina Ramesh < > > nram...@linkedin.com.invalid> wrote: > > > >> Hey Ash, > >> I was referring to the lines before the try block. > >> > >> Map<String, Object> jsonObject = (Map<String, Object>) > >> envelope.getMessage(); > >> WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); > >> > >> try { > >> System.out.println("[DWH] should see this"); > >> System.out.println(event.getRawEvent()); > >> … > >> > >> > >> Did you remove those lines as well? > >> > >> Navina > >> > >> On 3/23/15, 8:31 PM, "Ash W Matheson" <ash.mathe...@gmail.com> wrote: > >> > >> >Just looking at the diff I posted and it's: > >> > > >> > > >> > 1. try { > >> > 2. - Map<String, Object> parsedJsonObject = > >> >parse(event.getRawEvent( > >> > )); > >> > 3. + System.out.println("[DWH] should see this"); > >> > 4. + System.out.println(event.getRawEvent()); > >> > 5. + // Map<String, Object> parsedJsonObject = parse( > >> > event.getRawEvent()); > >> > > >> > > >> >I've removed the Map and added two System.out.println calls. So no, > >> there > >> >shouldn't be any reference to > >> >Map<String, Object> parsedJsonObject = parse(event.getRawEvent()); > >> >in the source java file. > >> > > >> > > >> >On Mon, Mar 23, 2015 at 7:42 PM, Ash W Matheson < > ash.mathe...@gmail.com> > >> >wrote: > >> > > >> >> I'm in transit right now but if memory serves me everything should be > >> >> commented out of that method except for the System.out.println call. > >> >>I'll > >> >> be home shortly and can confirm. > >> >> On Mar 23, 2015 7:28 PM, "Navina Ramesh" > <nram...@linkedin.com.invalid > >> > > >> >> wrote: > >> >> > >> >>> Hi Ash, > >> >>> I just ran wikipedia-parser with your patch. Looks like you have set > >> >>>the > >> >>> message serde correctly in the configs. However, the original code > >> >>>still > >> >>> converts it into a Map for consumption in the WikipediaFeedEvent. > >> >>> I am seeing the following (expected): > >> >>> > >> >>> 2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR] Uncaught > >> >>> exception in thread (name=main). Exiting process now. > >> >>> java.lang.ClassCastException: java.lang.String cannot be cast to > >> >>> java.util.Map > >> >>> at > >> >>> > >> >>> > >> > >> > >>>samza.examples.wikipedia.task.WikipediaParserStreamTask.process(Wikipedi > >> >>>aPa > >> >>> rserStreamTask.java:38) > >> >>> at > >> >>> > >> >>> > >> > >> > >>>org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp( > >> >>>Tas > >> >>> kInstance.scala:133) > >> >>> > >> >>> Did you make the changes to fix this error? Your patch doesn¹t seem > to > >> >>> have that. > >> >>> Line 38 Map<String, Object> jsonObject = (Map<String, Object>) > >> >>> envelope.getMessage(); > >> >>> > >> >>> > >> >>> > >> >>> Lmk so I can investigate further. > >> >>> > >> >>> Cheers! > >> >>> Navina > >> >>> > >> >>> On 3/23/15, 6:43 PM, "Ash W Matheson" <ash.mathe...@gmail.com> > wrote: > >> >>> > >> >>> >If anyone's interested, I've posted a diff of the project here: > >> >>> >http://pastebin.com/6ZW6Y1Vu > >> >>> >and the python publisher here: http://pastebin.com/2NvTFDFx > >> >>> > > >> >>> >if you want to take a stab at it. > >> >>> > > >> >>> >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson > >> >>><ash.mathe...@gmail.com> > >> >>> >wrote: > >> >>> > > >> >>> >> Ok, so very simple test, all running on a local machine, not > across > >> >>> >> networks and all in the hello-samza repo this time around. > >> >>> >> > >> >>> >> I've got the datapusher.py file set up to push data into > localhost. > >> >>>One > >> >>> >> event per second. > >> >>> >> And a modified hello-samza where I've modified the > >> >>> >> WikipediaParserStreamTask.java class to simply read what's there. > >> >>> >> > >> >>> >> Running them both now and I'm seeing in the stderr files > >> >>> >> > (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr) > >> >>>the > >> >>> >> following: > >> >>> >> > >> >>> >> Exception in thread "main" > >> >>> >> org.apache.samza.system.SystemConsumersException: Cannot > >> >>>deserialize an > >> >>> >> incoming message. > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2 > >> >>>>>93) > >> >>> >> at org.apache.samza.system.SystemConsumers.org > >> >>> >> > >> >>>$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste > >> >>>>>mCo > >> >>> >>nsumers.scala:276) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste > >> >>>>>mCo > >> >>> >>nsumers.scala:276) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike. > >> >>>>>sca > >> >>> >>la:244) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike. > >> >>>>>sca > >> >>> >>la:244) > >> >>> >> at > scala.collection.Iterator$class.foreach(Iterator.scala:727) > >> >>> >> at > >> >>>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >> >>> >> at > >> >>> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) > >> >>> >> at > >> >>> >> > >> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.s > >> >>>>>cal > >> >>> >>a:47) > >> >>> >> at scala.collection.SetLike$class.map(SetLike.scala:93) > >> >>> >> at scala.collection.AbstractSet.map(Set.scala:47) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala: > >> >>>>>276 > >> >>> >>) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:2 > >> >>>>>13) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply > >> >>>>>(Ru > >> >>> >>nLoop.scala:81) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply > >> >>>>>(Ru > >> >>> >>nLoop.scala:81) > >> >>> >> at > >> >>> >> > >> > >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > >> >>> >> at > >> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.sc > >> >>>>>ala > >> >>> >>:80) > >> >>> >> at > >> >>> >> > >> > >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > >> >>> >> at > >> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > >> >>> >> at > org.apache.samza.container.RunLoop.process(RunLoop.scala:79) > >> >>> >> at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) > >> >>> >> at > >> >>> >> > >> > >>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca > >> >>>>>la: > >> >>> >>108) > >> >>> >> at > >> >>> >> > >> >>> > >> > >> > >>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) > >> >>> >> at > >> >>> > >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > >> >>> >> Caused by: org.codehaus.jackson.JsonParseException: Unexpected > >> >>> character > >> >>> >> ('M' (code 77)): expected a valid value (number, String, array, > >> >>>object, > >> >>> >> 'true', 'false' or 'null') > >> >>> >> at [Source: [B@5454d285; line: 1, column: 2] > >> >>> >> at > >> >>> >> > >> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParse > >> >>>>>rMi > >> >>> >>nimalBase.java:385) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar( > >> >>>>>Jso > >> >>> >>nParserMinimalBase.java:306) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8 > >> >>>>>Str > >> >>> >>eamParser.java:1581) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8S > >> >>>>>tre > >> >>> >>amParser.java:436) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser. > >> >>>>>jav > >> >>> >>a:322) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.jav > >> >>>>>a:2 > >> >>> >>432) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.ja > >> >>>>>va: > >> >>> >>2389) > >> >>> >> at > >> >>> >> > >> > >>>org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667) > >> >>> >> at > >> >>> > >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala > >> >>>>>:11 > >> >>> >>5) > >> >>> >> at > >> >>> >> > >> >>> > >> >>> > >> > >> > >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2 > >> >>>>>90) > >> >>> >> > >> >>> >> > >> >>> >> I changed the systems.kafka.samza.msg.serde=json to 'string' a > >> while > >> >>> >>back, > >> >>> >> but that caused a separate exception. However that was many, > MANY > >> >>> >>attempts > >> >>> >> ago. > >> >>> >> > >> >>> >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson < > >> >>> ash.mathe...@gmail.com> > >> >>> >> wrote: > >> >>> >> > >> >>> >>> Ahh, I was going to add it to the run-class.sh script. > >> >>> >>> > >> >>> >>> Yeah, it's already there by default: > >> >>> >>> > >> >>> >>> > >> >>> >>> # Metrics > >> >>> >>> metrics.reporters=snapshot,jmx > >> >>> >>> > >> >>> >>> > >> >>> > >> >>> > >> > >> > >>>>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Met > >> >>>>>>ric > >> >>> >>>sSnapshotReporterFactory > >> >>> >>> metrics.reporter.snapshot.stream=kafka.metrics > >> >>> >>> > >> >>> >>> > >> >>> > >> >>> > >> > >> > >>>>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxRepor > >> >>>>>>ter > >> >>> >>>Factory > >> >>> >>> > >> >>> >>> So, where would I see those metrics? > >> >>> >>> > >> >>> >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson > >> >>> >>><ash.mathe...@gmail.com> > >> >>> >>> wrote: > >> >>> >>> > >> >>> >>>> read: I'm a C++ programmer looking at Java for the first time > in > >> >>>> 10 > >> >>> >>>> years > >> >>> >>>> > >> >>> >>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson > >> >>> >>>><ash.mathe...@gmail.com> > >> >>> >>>> wrote: > >> >>> >>>> > >> >>> >>>>> I'm assuming I have Jmx defined ... where would that get set? > >> >>> >>>>> > >> >>> >>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman < > >> >>> >>>>> chinmay.cere...@gmail.com> wrote: > >> >>> >>>>> > >> >>> >>>>>> Hey Ash, > >> >>> >>>>>> > >> >>> >>>>>> Can you see your job metrics (if you have the Jmx metrics > >> >>>defined) > >> >>> >>>>>>to > >> >>> >>>>>> see > >> >>> >>>>>> if your job is actually doing anything ? My only guess at > this > >> >>> point > >> >>> >>>>>> is the > >> >>> >>>>>> process method is not being called because somehow there's no > >> >>> >>>>>>incoming > >> >>> >>>>>> data. I could be totally wrong of course. > >> >>> >>>>>> > >> >>> >>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson < > >> >>> >>>>>> ash.mathe...@gmail.com> > >> >>> >>>>>> wrote: > >> >>> >>>>>> > >> >>> >>>>>> > Just to be clear, here's what's changed from the default > >> >>> >>>>>>hello-samza > >> >>> >>>>>> repo: > >> >>> >>>>>> > > >> >>> >>>>>> > wikipedia-parser.properties========================== > >> >>> >>>>>> > task.inputs=kafka.myTopic > >> >>> >>>>>> > systems.kafka.consumer.zookeeper.connect= > >> >>> >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/ > >> >>> >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest > >> >>> >>>>>> > > >> >>> >>>>>> > WikipediaParserStreamTask.java ===================== > >> >>> >>>>>> > public void process(IncomingMessageEnvelope envelope, > >> >>> >>>>>> MessageCollector > >> >>> >>>>>> > collector, TaskCoordinator coordinator) { > >> >>> >>>>>> > Map<String, Object> jsonObject = (Map<String, Object>) > >> >>> >>>>>> > envelope.getMessage(); > >> >>> >>>>>> > WikipediaFeedEvent event = new > >> >>> WikipediaFeedEvent(jsonObject); > >> >>> >>>>>> > > >> >>> >>>>>> > try { > >> >>> >>>>>> > System.out.println(event.getRawEvent()); > >> >>> >>>>>> > // Map<String, Object> parsedJsonObject = > >> >>> >>>>>> parse(event.getRawEvent()); > >> >>> >>>>>> > > >> >>> >>>>>> > // parsedJsonObject.put("channel", > event.getChannel()); > >> >>> >>>>>> > // parsedJsonObject.put("source", event.getSource()); > >> >>> >>>>>> > // parsedJsonObject.put("time", event.getTime()); > >> >>> >>>>>> > > >> >>> >>>>>> > // collector.send(new OutgoingMessageEnvelope(new > >> >>> >>>>>> > SystemStream("kafka", "wikipedia-edits"), > parsedJsonObject)); > >> >>> >>>>>> > > >> >>> >>>>>> > as well as the aforementioned changes to the log4j.xml > file. > >> >>> >>>>>> > > >> >>> >>>>>> > The data pushed into the 'myTopic' topic is nothing more > than > >> >>>a > >> >>> >>>>>> sentence. > >> >>> >>>>>> > > >> >>> >>>>>> > > >> >>> >>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson < > >> >>> >>>>>> ash.mathe...@gmail.com> > >> >>> >>>>>> > wrote: > >> >>> >>>>>> > > >> >>> >>>>>> > > yep, modified log4j.xml to look like this: > >> >>> >>>>>> > > > >> >>> >>>>>> > > <root> > >> >>> >>>>>> > > <priority value="debug" /> > >> >>> >>>>>> > > <appender-ref ref="RollingAppender"/> > >> >>> >>>>>> > > <appender-ref ref="jmx" /> > >> >>> >>>>>> > > </root> > >> >>> >>>>>> > > > >> >>> >>>>>> > > Not sure what you mean by #2. > >> >>> >>>>>> > > > >> >>> >>>>>> > > However, I'm running now, not seeing any exceptions, but > >> >>>still > >> >>> >>>>>>not > >> >>> >>>>>> seeing > >> >>> >>>>>> > > any output from System.out.println(...) > >> >>> >>>>>> > > > >> >>> >>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram < > >> >>> >>>>>> > > nsomasunda...@linkedin.com.invalid> wrote: > >> >>> >>>>>> > > > >> >>> >>>>>> > >> Hey Ash, > >> >>> >>>>>> > >> 1. Did you happen to modify your > log4j.xml > >> ? > >> >>> >>>>>> > >> 2. Can you print the class path that was > >> >>> printed > >> >>> >>>>>> when the > >> >>> >>>>>> > >> job started ? I am wondering if log4j was not loaded or > >> not > >> >>> >>>>>> present in > >> >>> >>>>>> > the > >> >>> >>>>>> > >> path where it¹s looking for. If you have been using > hello > >> >>> >>>>>>samza, > >> >>> >>>>>> it > >> >>> >>>>>> > should > >> >>> >>>>>> > >> have pulled it from Maven. > >> >>> >>>>>> > >> > >> >>> >>>>>> > >> Thanks, > >> >>> >>>>>> > >> Naveen > >> >>> >>>>>> > >> > >> >>> >>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson < > >> >>> >>>>>> ash.mathe...@gmail.com> > >> >>> >>>>>> > >> wrote: > >> >>> >>>>>> > >> > >> >>> >>>>>> > >> > Hey all, > >> >>> >>>>>> > >> > > >> >>> >>>>>> > >> > Evaluating Samza currently and am running into some > odd > >> >>> >>>>>>issues. > >> >>> >>>>>> > >> > > >> >>> >>>>>> > >> > I'm currently working off the 'hello-samza' repo and > >> >>>trying > >> >>> >>>>>>to > >> >>> >>>>>> parse a > >> >>> >>>>>> > >> > simple kafka topic that I've produced through an > extenal > >> >>> java > >> >>> >>>>>> app > >> >>> >>>>>> > >> (nothing > >> >>> >>>>>> > >> > other than a series of sentences) and it's failing > >> pretty > >> >>> >>>>>>hard > >> >>> >>>>>> for me. > >> >>> >>>>>> > >> The > >> >>> >>>>>> > >> > base 'hello-samza' set of apps works fine, but as soon > >> >>>as I > >> >>> >>>>>> change the > >> >>> >>>>>> > >> > configuration to look at a different Kafka/zookeeper I > >> >>>get > >> >>> >>>>>>the > >> >>> >>>>>> > >> following in > >> >>> >>>>>> > >> > the userlogs: > >> >>> >>>>>> > >> > > >> >>> >>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to > >> >>>fetch > >> >>> >>>>>>last > >> >>> >>>>>> > offsets > >> >>> >>>>>> > >> > for streams [myTopic] due to > >> kafka.common.KafkaException: > >> >>> >>>>>> fetching > >> >>> >>>>>> > topic > >> >>> >>>>>> > >> > metadata for topics [Set(myTopic)] from broker > >> >>> >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. > >> >>> Retrying. > >> >>> >>>>>> > >> > > >> >>> >>>>>> > >> > > >> >>> >>>>>> > >> > The modifications are pretty straightforward. In the > >> >>> >>>>>> > >> > Wikipedia-parser.properties, I've changed the > following: > >> >>> >>>>>> > >> > task.inputs=kafka.myTopic > >> >>> >>>>>> > >> > > systems.kafka.consumer.zookeeper.connect=redacted:2181/ > >> >>> >>>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest > >> >>> >>>>>> > >> > > >> systems.kafka.producer.metadata.broker.list=redacted:9092 > >> >>> >>>>>> > >> > > >> >>> >>>>>> > >> > and in the actual java file > >> >>>WikipediaParserStreamTask.java > >> >>> >>>>>> > >> > public void process(IncomingMessageEnvelope envelope, > >> >>> >>>>>> > MessageCollector > >> >>> >>>>>> > >> > collector, TaskCoordinator coordinator) { > >> >>> >>>>>> > >> > Map<String, Object> jsonObject = (Map<String, > >> Object>) > >> >>> >>>>>> > >> > envelope.getMessage(); > >> >>> >>>>>> > >> > WikipediaFeedEvent event = new > >> >>> >>>>>> WikipediaFeedEvent(jsonObject); > >> >>> >>>>>> > >> > > >> >>> >>>>>> > >> > try { > >> >>> >>>>>> > >> > System.out.println(event.getRawEvent()); > >> >>> >>>>>> > >> > > >> >>> >>>>>> > >> > And then following the compile/extract/run process > >> >>>outlined > >> >>> >>>>>>in > >> >>> >>>>>> the > >> >>> >>>>>> > >> > hello-samza website. > >> >>> >>>>>> > >> > > >> >>> >>>>>> > >> > Any thoughts? I've looked online for any 'super > simple' > >> >>> >>>>>> examples of > >> >>> >>>>>> > >> > ingesting kafka in samza with very little success. > >> >>> >>>>>> > >> > >> >>> >>>>>> > >> > >> >>> >>>>>> > > > >> >>> >>>>>> > > >> >>> >>>>>> > >> >>> >>>>>> > >> >>> >>>>>> > >> >>> >>>>>> -- > >> >>> >>>>>> Thanks and regards > >> >>> >>>>>> > >> >>> >>>>>> Chinmay Soman > >> >>> >>>>>> > >> >>> >>>>> > >> >>> >>>>> > >> >>> >>>> > >> >>> >>> > >> >>> >> > >> >>> > >> >>> > >> > >> > > > -- Thanks and regards Chinmay Soman