Hey Ash, Can you see your job metrics (if you have the Jmx metrics defined) to see if your job is actually doing anything ? My only guess at this point is the process method is not being called because somehow there's no incoming data. I could be totally wrong of course.
On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson <ash.mathe...@gmail.com> wrote: > Just to be clear, here's what's changed from the default hello-samza repo: > > wikipedia-parser.properties========================== > task.inputs=kafka.myTopic > systems.kafka.consumer.zookeeper.connect= > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/ > systems.kafka.consumer.auto.offset.reset=smallest > > WikipediaParserStreamTask.java ===================== > public void process(IncomingMessageEnvelope envelope, MessageCollector > collector, TaskCoordinator coordinator) { > Map<String, Object> jsonObject = (Map<String, Object>) > envelope.getMessage(); > WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); > > try { > System.out.println(event.getRawEvent()); > // Map<String, Object> parsedJsonObject = parse(event.getRawEvent()); > > // parsedJsonObject.put("channel", event.getChannel()); > // parsedJsonObject.put("source", event.getSource()); > // parsedJsonObject.put("time", event.getTime()); > > // collector.send(new OutgoingMessageEnvelope(new > SystemStream("kafka", "wikipedia-edits"), parsedJsonObject)); > > as well as the aforementioned changes to the log4j.xml file. > > The data pushed into the 'myTopic' topic is nothing more than a sentence. > > > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson <ash.mathe...@gmail.com> > wrote: > > > yep, modified log4j.xml to look like this: > > > > <root> > > <priority value="debug" /> > > <appender-ref ref="RollingAppender"/> > > <appender-ref ref="jmx" /> > > </root> > > > > Not sure what you mean by #2. > > > > However, I'm running now, not seeing any exceptions, but still not seeing > > any output from System.out.println(...) > > > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram < > > nsomasunda...@linkedin.com.invalid> wrote: > > > >> Hey Ash, > >> 1. Did you happen to modify your log4j.xml ? > >> 2. Can you print the class path that was printed when the > >> job started ? I am wondering if log4j was not loaded or not present in > the > >> path where it’s looking for. If you have been using hello samza, it > should > >> have pulled it from Maven. > >> > >> Thanks, > >> Naveen > >> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson <ash.mathe...@gmail.com> > >> wrote: > >> > >> > Hey all, > >> > > >> > Evaluating Samza currently and am running into some odd issues. > >> > > >> > I'm currently working off the 'hello-samza' repo and trying to parse a > >> > simple kafka topic that I've produced through an extenal java app > >> (nothing > >> > other than a series of sentences) and it's failing pretty hard for me. > >> The > >> > base 'hello-samza' set of apps works fine, but as soon as I change the > >> > configuration to look at a different Kafka/zookeeper I get the > >> following in > >> > the userlogs: > >> > > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch last > offsets > >> > for streams [myTopic] due to kafka.common.KafkaException: fetching > topic > >> > metadata for topics [Set(myTopic)] from broker > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. Retrying. > >> > > >> > > >> > The modifications are pretty straightforward. In the > >> > Wikipedia-parser.properties, I've changed the following: > >> > task.inputs=kafka.myTopic > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/ > >> > systems.kafka.consumer.auto.offset.reset=smallest > >> > systems.kafka.producer.metadata.broker.list=redacted:9092 > >> > > >> > and in the actual java file WikipediaParserStreamTask.java > >> > public void process(IncomingMessageEnvelope envelope, > MessageCollector > >> > collector, TaskCoordinator coordinator) { > >> > Map<String, Object> jsonObject = (Map<String, Object>) > >> > envelope.getMessage(); > >> > WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); > >> > > >> > try { > >> > System.out.println(event.getRawEvent()); > >> > > >> > And then following the compile/extract/run process outlined in the > >> > hello-samza website. > >> > > >> > Any thoughts? I've looked online for any 'super simple' examples of > >> > ingesting kafka in samza with very little success. > >> > >> > > > -- Thanks and regards Chinmay Soman