Ok, so very simple test, all running on a local machine, not across networks and all in the hello-samza repo this time around.
I've got the datapusher.py file set up to push data into localhost. One event per second. And a modified hello-samza where I've modified the WikipediaParserStreamTask.java class to simply read what's there. Running them both now and I'm seeing in the stderr files (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr) the following: Exception in thread "main" org.apache.samza.system.SystemConsumersException: Cannot deserialize an incoming message. at org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:293) at org.apache.samza.system.SystemConsumers.org $apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260) at org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemConsumers.scala:276) at org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemConsumers.scala:276) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.AbstractSet.map(Set.scala:47) at org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:276) at org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:213) at org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(RunLoop.scala:81) at org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(RunLoop.scala:81) at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) at org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:80) at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) at org.apache.samza.container.RunLoop.process(RunLoop.scala:79) at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) Caused by: org.codehaus.jackson.JsonParseException: Unexpected character ('M' (code 77)): expected a valid value (number, String, array, object, 'true', 'false' or 'null') at [Source: [B@5454d285; line: 1, column: 2] at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291) at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385) at org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(JsonParserMinimalBase.java:306) at org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8StreamParser.java:1581) at org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8StreamParser.java:436) at org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.java:322) at org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.java:2432) at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2389) at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667) at org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33) at org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala:115) at org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:290) I changed the systems.kafka.samza.msg.serde=json to 'string' a while back, but that caused a separate exception. However that was many, MANY attempts ago. On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson <ash.mathe...@gmail.com> wrote: > Ahh, I was going to add it to the run-class.sh script. > > Yeah, it's already there by default: > > > # Metrics > metrics.reporters=snapshot,jmx > > metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory > metrics.reporter.snapshot.stream=kafka.metrics > > metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory > > So, where would I see those metrics? > > On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson <ash.mathe...@gmail.com> > wrote: > >> read: I'm a C++ programmer looking at Java for the first time in > 10 >> years >> >> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson <ash.mathe...@gmail.com> >> wrote: >> >>> I'm assuming I have Jmx defined ... where would that get set? >>> >>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman < >>> chinmay.cere...@gmail.com> wrote: >>> >>>> Hey Ash, >>>> >>>> Can you see your job metrics (if you have the Jmx metrics defined) to >>>> see >>>> if your job is actually doing anything ? My only guess at this point is >>>> the >>>> process method is not being called because somehow there's no incoming >>>> data. I could be totally wrong of course. >>>> >>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson <ash.mathe...@gmail.com >>>> > >>>> wrote: >>>> >>>> > Just to be clear, here's what's changed from the default hello-samza >>>> repo: >>>> > >>>> > wikipedia-parser.properties========================== >>>> > task.inputs=kafka.myTopic >>>> > systems.kafka.consumer.zookeeper.connect= >>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/ >>>> > systems.kafka.consumer.auto.offset.reset=smallest >>>> > >>>> > WikipediaParserStreamTask.java ===================== >>>> > public void process(IncomingMessageEnvelope envelope, >>>> MessageCollector >>>> > collector, TaskCoordinator coordinator) { >>>> > Map<String, Object> jsonObject = (Map<String, Object>) >>>> > envelope.getMessage(); >>>> > WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); >>>> > >>>> > try { >>>> > System.out.println(event.getRawEvent()); >>>> > // Map<String, Object> parsedJsonObject = >>>> parse(event.getRawEvent()); >>>> > >>>> > // parsedJsonObject.put("channel", event.getChannel()); >>>> > // parsedJsonObject.put("source", event.getSource()); >>>> > // parsedJsonObject.put("time", event.getTime()); >>>> > >>>> > // collector.send(new OutgoingMessageEnvelope(new >>>> > SystemStream("kafka", "wikipedia-edits"), parsedJsonObject)); >>>> > >>>> > as well as the aforementioned changes to the log4j.xml file. >>>> > >>>> > The data pushed into the 'myTopic' topic is nothing more than a >>>> sentence. >>>> > >>>> > >>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson < >>>> ash.mathe...@gmail.com> >>>> > wrote: >>>> > >>>> > > yep, modified log4j.xml to look like this: >>>> > > >>>> > > <root> >>>> > > <priority value="debug" /> >>>> > > <appender-ref ref="RollingAppender"/> >>>> > > <appender-ref ref="jmx" /> >>>> > > </root> >>>> > > >>>> > > Not sure what you mean by #2. >>>> > > >>>> > > However, I'm running now, not seeing any exceptions, but still not >>>> seeing >>>> > > any output from System.out.println(...) >>>> > > >>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram < >>>> > > nsomasunda...@linkedin.com.invalid> wrote: >>>> > > >>>> > >> Hey Ash, >>>> > >> 1. Did you happen to modify your log4j.xml ? >>>> > >> 2. Can you print the class path that was printed >>>> when the >>>> > >> job started ? I am wondering if log4j was not loaded or not >>>> present in >>>> > the >>>> > >> path where it’s looking for. If you have been using hello samza, it >>>> > should >>>> > >> have pulled it from Maven. >>>> > >> >>>> > >> Thanks, >>>> > >> Naveen >>>> > >> >>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson < >>>> ash.mathe...@gmail.com> >>>> > >> wrote: >>>> > >> >>>> > >> > Hey all, >>>> > >> > >>>> > >> > Evaluating Samza currently and am running into some odd issues. >>>> > >> > >>>> > >> > I'm currently working off the 'hello-samza' repo and trying to >>>> parse a >>>> > >> > simple kafka topic that I've produced through an extenal java app >>>> > >> (nothing >>>> > >> > other than a series of sentences) and it's failing pretty hard >>>> for me. >>>> > >> The >>>> > >> > base 'hello-samza' set of apps works fine, but as soon as I >>>> change the >>>> > >> > configuration to look at a different Kafka/zookeeper I get the >>>> > >> following in >>>> > >> > the userlogs: >>>> > >> > >>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch last >>>> > offsets >>>> > >> > for streams [myTopic] due to kafka.common.KafkaException: >>>> fetching >>>> > topic >>>> > >> > metadata for topics [Set(myTopic)] from broker >>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. Retrying. >>>> > >> > >>>> > >> > >>>> > >> > The modifications are pretty straightforward. In the >>>> > >> > Wikipedia-parser.properties, I've changed the following: >>>> > >> > task.inputs=kafka.myTopic >>>> > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/ >>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest >>>> > >> > systems.kafka.producer.metadata.broker.list=redacted:9092 >>>> > >> > >>>> > >> > and in the actual java file WikipediaParserStreamTask.java >>>> > >> > public void process(IncomingMessageEnvelope envelope, >>>> > MessageCollector >>>> > >> > collector, TaskCoordinator coordinator) { >>>> > >> > Map<String, Object> jsonObject = (Map<String, Object>) >>>> > >> > envelope.getMessage(); >>>> > >> > WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); >>>> > >> > >>>> > >> > try { >>>> > >> > System.out.println(event.getRawEvent()); >>>> > >> > >>>> > >> > And then following the compile/extract/run process outlined in >>>> the >>>> > >> > hello-samza website. >>>> > >> > >>>> > >> > Any thoughts? I've looked online for any 'super simple' >>>> examples of >>>> > >> > ingesting kafka in samza with very little success. >>>> > >> >>>> > >> >>>> > > >>>> > >>>> >>>> >>>> >>>> -- >>>> Thanks and regards >>>> >>>> Chinmay Soman >>>> >>> >>> >> >