Gah!  Yeah, those were gone several revisions ago but didn't get nuked in
the last iteration.

OK, let me do a quick test to see if that was my problem all along.

On Mon, Mar 23, 2015 at 8:38 PM, Navina Ramesh <nram...@linkedin.com.invalid
> wrote:

> Hey Ash,
> I was referring to the lines before the try block.
>
> Map<String, Object> jsonObject = (Map<String, Object>)
> envelope.getMessage();
>     WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
>
>     try {
>       System.out.println("[DWH] should see this");
>       System.out.println(event.getRawEvent());
> …
>
>
> Did you remove those lines as well?
>
> Navina
>
> On 3/23/15, 8:31 PM, "Ash W Matheson" <ash.mathe...@gmail.com> wrote:
>
> >Just looking at the diff I posted and it's:
> >
> >
> >   1.      try {
> >   2. -      Map<String, Object> parsedJsonObject =
> >parse(event.getRawEvent(
> >   ));
> >   3. +      System.out.println("[DWH] should see this");
> >   4. +      System.out.println(event.getRawEvent());
> >   5. +      // Map<String, Object> parsedJsonObject = parse(
> >   event.getRawEvent());
> >
> >
> >I've removed the Map and added two System.out.println calls.  So no, there
> >shouldn't be any reference to
> >Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
> >in the source java file.
> >
> >
> >On Mon, Mar 23, 2015 at 7:42 PM, Ash W Matheson <ash.mathe...@gmail.com>
> >wrote:
> >
> >> I'm in transit right now but if memory serves me everything should be
> >> commented out of that method except for the System.out.println call.
> >>I'll
> >> be home shortly and can confirm.
> >> On Mar 23, 2015 7:28 PM, "Navina Ramesh" <nram...@linkedin.com.invalid>
> >> wrote:
> >>
> >>> Hi Ash,
> >>> I just ran wikipedia-parser with your patch. Looks like you have set
> >>>the
> >>> message serde correctly in the configs. However, the original code
> >>>still
> >>> converts it into a Map for consumption in the WikipediaFeedEvent.
> >>> I am seeing the following (expected):
> >>>
> >>> 2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR] Uncaught
> >>> exception in thread (name=main). Exiting process now.
> >>> java.lang.ClassCastException: java.lang.String cannot be cast to
> >>> java.util.Map
> >>>  at
> >>>
> >>>
> >>>samza.examples.wikipedia.task.WikipediaParserStreamTask.process(Wikipedi
> >>>aPa
> >>> rserStreamTask.java:38)
> >>>  at
> >>>
> >>>
> >>>org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(
> >>>Tas
> >>> kInstance.scala:133)
> >>>
> >>> Did you make the changes to fix this error? Your patch doesn¹t seem to
> >>> have that.
> >>> Line 38 Map<String, Object> jsonObject = (Map<String, Object>)
> >>> envelope.getMessage();
> >>>
> >>>
> >>>
> >>> Lmk so I can investigate further.
> >>>
> >>> Cheers!
> >>> Navina
> >>>
> >>> On 3/23/15, 6:43 PM, "Ash W Matheson" <ash.mathe...@gmail.com> wrote:
> >>>
> >>> >If anyone's interested, I've posted a diff of the project here:
> >>> >http://pastebin.com/6ZW6Y1Vu
> >>> >and the python publisher here: http://pastebin.com/2NvTFDFx
> >>> >
> >>> >if you want to take a stab at it.
> >>> >
> >>> >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson
> >>><ash.mathe...@gmail.com>
> >>> >wrote:
> >>> >
> >>> >> Ok, so very simple test, all running on a local machine, not across
> >>> >> networks and all in the hello-samza repo this time around.
> >>> >>
> >>> >> I've got the datapusher.py file set up to push data into localhost.
> >>>One
> >>> >> event per second.
> >>> >> And a modified hello-samza where I've modified the
> >>> >> WikipediaParserStreamTask.java class to simply read what's there.
> >>> >>
> >>> >> Running them both now and I'm seeing in the stderr files
> >>> >> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr)
> >>>the
> >>> >> following:
> >>> >>
> >>> >> Exception in thread "main"
> >>> >> org.apache.samza.system.SystemConsumersException: Cannot
> >>>deserialize an
> >>> >> incoming message.
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2
> >>>>>93)
> >>> >>     at org.apache.samza.system.SystemConsumers.org
> >>> >>
> >>>$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste
> >>>>>mCo
> >>> >>nsumers.scala:276)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste
> >>>>>mCo
> >>> >>nsumers.scala:276)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
> >>>>>sca
> >>> >>la:244)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
> >>>>>sca
> >>> >>la:244)
> >>> >>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>> >>     at
> >>>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>> >>     at
> >>> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
> >>> >>     at
> >>> >>
> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.s
> >>>>>cal
> >>> >>a:47)
> >>> >>     at scala.collection.SetLike$class.map(SetLike.scala:93)
> >>> >>     at scala.collection.AbstractSet.map(Set.scala:47)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:
> >>>>>276
> >>> >>)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:2
> >>>>>13)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply
> >>>>>(Ru
> >>> >>nLoop.scala:81)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply
> >>>>>(Ru
> >>> >>nLoop.scala:81)
> >>> >>     at
> >>> >>
> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> >>> >>     at
> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.sc
> >>>>>ala
> >>> >>:80)
> >>> >>     at
> >>> >>
> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> >>> >>     at
> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> >>> >>     at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
> >>> >>     at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
> >>> >>     at
> >>> >>
> >>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca
> >>>>>la:
> >>> >>108)
> >>> >>     at
> >>> >>
> >>>
> >>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> >>> >>     at
> >>> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >>> >> Caused by: org.codehaus.jackson.JsonParseException: Unexpected
> >>> character
> >>> >> ('M' (code 77)): expected a valid value (number, String, array,
> >>>object,
> >>> >> 'true', 'false' or 'null')
> >>> >>  at [Source: [B@5454d285; line: 1, column: 2]
> >>> >>     at
> >>> >>
> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParse
> >>>>>rMi
> >>> >>nimalBase.java:385)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(
> >>>>>Jso
> >>> >>nParserMinimalBase.java:306)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8
> >>>>>Str
> >>> >>eamParser.java:1581)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8S
> >>>>>tre
> >>> >>amParser.java:436)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.
> >>>>>jav
> >>> >>a:322)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.jav
> >>>>>a:2
> >>> >>432)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.ja
> >>>>>va:
> >>> >>2389)
> >>> >>     at
> >>> >>
> >>>org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
> >>> >>     at
> >>> >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala
> >>>>>:11
> >>> >>5)
> >>> >>     at
> >>> >>
> >>>
> >>>
> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2
> >>>>>90)
> >>> >>
> >>> >>
> >>> >> I changed the systems.kafka.samza.msg.serde=json to 'string' a while
> >>> >>back,
> >>> >> but that caused a separate exception.  However that was many, MANY
> >>> >>attempts
> >>> >> ago.
> >>> >>
> >>> >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson <
> >>> ash.mathe...@gmail.com>
> >>> >> wrote:
> >>> >>
> >>> >>> Ahh, I was going to add it to the run-class.sh script.
> >>> >>>
> >>> >>> Yeah, it's already there by default:
> >>> >>>
> >>> >>>
> >>> >>> # Metrics
> >>> >>> metrics.reporters=snapshot,jmx
> >>> >>>
> >>> >>>
> >>>
> >>>
> >>>>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Met
> >>>>>>ric
> >>> >>>sSnapshotReporterFactory
> >>> >>> metrics.reporter.snapshot.stream=kafka.metrics
> >>> >>>
> >>> >>>
> >>>
> >>>
> >>>>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxRepor
> >>>>>>ter
> >>> >>>Factory
> >>> >>>
> >>> >>> So, where would I see those metrics?
> >>> >>>
> >>> >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson
> >>> >>><ash.mathe...@gmail.com>
> >>> >>> wrote:
> >>> >>>
> >>> >>>> read: I'm a C++ programmer looking at Java for the first time in
> >>>> 10
> >>> >>>> years
> >>> >>>>
> >>> >>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson
> >>> >>>><ash.mathe...@gmail.com>
> >>> >>>> wrote:
> >>> >>>>
> >>> >>>>> I'm assuming I have Jmx defined ... where would that get set?
> >>> >>>>>
> >>> >>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman <
> >>> >>>>> chinmay.cere...@gmail.com> wrote:
> >>> >>>>>
> >>> >>>>>> Hey Ash,
> >>> >>>>>>
> >>> >>>>>> Can you see your job metrics (if you have the Jmx metrics
> >>>defined)
> >>> >>>>>>to
> >>> >>>>>> see
> >>> >>>>>> if your job is actually doing anything ? My only guess at this
> >>> point
> >>> >>>>>> is the
> >>> >>>>>> process method is not being called because somehow there's no
> >>> >>>>>>incoming
> >>> >>>>>> data. I could be totally wrong of course.
> >>> >>>>>>
> >>> >>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson <
> >>> >>>>>> ash.mathe...@gmail.com>
> >>> >>>>>> wrote:
> >>> >>>>>>
> >>> >>>>>> > Just to be clear, here's what's changed from the default
> >>> >>>>>>hello-samza
> >>> >>>>>> repo:
> >>> >>>>>> >
> >>> >>>>>> > wikipedia-parser.properties==========================
> >>> >>>>>> > task.inputs=kafka.myTopic
> >>> >>>>>> > systems.kafka.consumer.zookeeper.connect=
> >>> >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/
> >>> >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest
> >>> >>>>>> >
> >>> >>>>>> > WikipediaParserStreamTask.java =====================
> >>> >>>>>> >   public void process(IncomingMessageEnvelope envelope,
> >>> >>>>>> MessageCollector
> >>> >>>>>> > collector, TaskCoordinator coordinator) {
> >>> >>>>>> >     Map<String, Object> jsonObject = (Map<String, Object>)
> >>> >>>>>> > envelope.getMessage();
> >>> >>>>>> >     WikipediaFeedEvent event = new
> >>> WikipediaFeedEvent(jsonObject);
> >>> >>>>>> >
> >>> >>>>>> >     try {
> >>> >>>>>> >       System.out.println(event.getRawEvent());
> >>> >>>>>> >       // Map<String, Object> parsedJsonObject =
> >>> >>>>>> parse(event.getRawEvent());
> >>> >>>>>> >
> >>> >>>>>> >       // parsedJsonObject.put("channel", event.getChannel());
> >>> >>>>>> >       // parsedJsonObject.put("source", event.getSource());
> >>> >>>>>> >       // parsedJsonObject.put("time", event.getTime());
> >>> >>>>>> >
> >>> >>>>>> >       // collector.send(new OutgoingMessageEnvelope(new
> >>> >>>>>> > SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
> >>> >>>>>> >
> >>> >>>>>> > as well as the aforementioned changes to the log4j.xml file.
> >>> >>>>>> >
> >>> >>>>>> > The data pushed into the 'myTopic' topic is nothing more than
> >>>a
> >>> >>>>>> sentence.
> >>> >>>>>> >
> >>> >>>>>> >
> >>> >>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson <
> >>> >>>>>> ash.mathe...@gmail.com>
> >>> >>>>>> > wrote:
> >>> >>>>>> >
> >>> >>>>>> > > yep, modified log4j.xml to look like this:
> >>> >>>>>> > >
> >>> >>>>>> > >   <root>
> >>> >>>>>> > >     <priority value="debug" />
> >>> >>>>>> > >     <appender-ref ref="RollingAppender"/>
> >>> >>>>>> > >     <appender-ref ref="jmx" />
> >>> >>>>>> > >   </root>
> >>> >>>>>> > >
> >>> >>>>>> > > Not sure what you mean by #2.
> >>> >>>>>> > >
> >>> >>>>>> > > However, I'm running now, not seeing any exceptions, but
> >>>still
> >>> >>>>>>not
> >>> >>>>>> seeing
> >>> >>>>>> > > any output from System.out.println(...)
> >>> >>>>>> > >
> >>> >>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram <
> >>> >>>>>> > > nsomasunda...@linkedin.com.invalid> wrote:
> >>> >>>>>> > >
> >>> >>>>>> > >> Hey Ash,
> >>> >>>>>> > >>                1. Did you happen to modify your log4j.xml ?
> >>> >>>>>> > >>                2. Can you print the class path that was
> >>> printed
> >>> >>>>>> when the
> >>> >>>>>> > >> job started ? I am wondering if log4j was not loaded or not
> >>> >>>>>> present in
> >>> >>>>>> > the
> >>> >>>>>> > >> path where it¹s looking for. If you have been using hello
> >>> >>>>>>samza,
> >>> >>>>>> it
> >>> >>>>>> > should
> >>> >>>>>> > >> have pulled it from Maven.
> >>> >>>>>> > >>
> >>> >>>>>> > >> Thanks,
> >>> >>>>>> > >> Naveen
> >>> >>>>>> > >>
> >>> >>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson <
> >>> >>>>>> ash.mathe...@gmail.com>
> >>> >>>>>> > >> wrote:
> >>> >>>>>> > >>
> >>> >>>>>> > >> > Hey all,
> >>> >>>>>> > >> >
> >>> >>>>>> > >> > Evaluating Samza currently and am running into some odd
> >>> >>>>>>issues.
> >>> >>>>>> > >> >
> >>> >>>>>> > >> > I'm currently working off the 'hello-samza' repo and
> >>>trying
> >>> >>>>>>to
> >>> >>>>>> parse a
> >>> >>>>>> > >> > simple kafka topic that I've produced through an extenal
> >>> java
> >>> >>>>>> app
> >>> >>>>>> > >> (nothing
> >>> >>>>>> > >> > other than a series of sentences) and it's failing pretty
> >>> >>>>>>hard
> >>> >>>>>> for me.
> >>> >>>>>> > >> The
> >>> >>>>>> > >> > base 'hello-samza' set of apps works fine, but as soon
> >>>as I
> >>> >>>>>> change the
> >>> >>>>>> > >> > configuration to look at a different Kafka/zookeeper I
> >>>get
> >>> >>>>>>the
> >>> >>>>>> > >> following in
> >>> >>>>>> > >> > the userlogs:
> >>> >>>>>> > >> >
> >>> >>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to
> >>>fetch
> >>> >>>>>>last
> >>> >>>>>> > offsets
> >>> >>>>>> > >> > for streams [myTopic] due to kafka.common.KafkaException:
> >>> >>>>>> fetching
> >>> >>>>>> > topic
> >>> >>>>>> > >> > metadata for topics [Set(myTopic)] from broker
> >>> >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed.
> >>> Retrying.
> >>> >>>>>> > >> >
> >>> >>>>>> > >> >
> >>> >>>>>> > >> > The modifications are pretty straightforward.  In the
> >>> >>>>>> > >> > Wikipedia-parser.properties, I've changed the following:
> >>> >>>>>> > >> > task.inputs=kafka.myTopic
> >>> >>>>>> > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/
> >>> >>>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest
> >>> >>>>>> > >> > systems.kafka.producer.metadata.broker.list=redacted:9092
> >>> >>>>>> > >> >
> >>> >>>>>> > >> > and in the actual java file
> >>>WikipediaParserStreamTask.java
> >>> >>>>>> > >> >  public void process(IncomingMessageEnvelope envelope,
> >>> >>>>>> > MessageCollector
> >>> >>>>>> > >> > collector, TaskCoordinator coordinator) {
> >>> >>>>>> > >> >    Map<String, Object> jsonObject = (Map<String, Object>)
> >>> >>>>>> > >> > envelope.getMessage();
> >>> >>>>>> > >> >    WikipediaFeedEvent event = new
> >>> >>>>>> WikipediaFeedEvent(jsonObject);
> >>> >>>>>> > >> >
> >>> >>>>>> > >> >    try {
> >>> >>>>>> > >> >        System.out.println(event.getRawEvent());
> >>> >>>>>> > >> >
> >>> >>>>>> > >> > And then following the compile/extract/run process
> >>>outlined
> >>> >>>>>>in
> >>> >>>>>> the
> >>> >>>>>> > >> > hello-samza website.
> >>> >>>>>> > >> >
> >>> >>>>>> > >> > Any thoughts?  I've looked online for any 'super simple'
> >>> >>>>>> examples of
> >>> >>>>>> > >> > ingesting kafka in samza with very little success.
> >>> >>>>>> > >>
> >>> >>>>>> > >>
> >>> >>>>>> > >
> >>> >>>>>> >
> >>> >>>>>>
> >>> >>>>>>
> >>> >>>>>>
> >>> >>>>>> --
> >>> >>>>>> Thanks and regards
> >>> >>>>>>
> >>> >>>>>> Chinmay Soman
> >>> >>>>>>
> >>> >>>>>
> >>> >>>>>
> >>> >>>>
> >>> >>>
> >>> >>
> >>>
> >>>
>
>

Reply via email to