Also, here's the producer: http://pastebin.com/qMNJabTZ
On Sun, Mar 22, 2015 at 10:57 AM, Ash W Matheson <ash.mathe...@gmail.com> wrote: > Yep, first thing I checked (got bitten by that earlier in the week with no > data actually in the topic). > > On Sun, Mar 22, 2015 at 10:56 AM, Chinmay Soman <chinmay.cere...@gmail.com > > wrote: > >> Can you double check that you can read data from your Kafka broker ? >> >> > ./deploy/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 >> --topic myTopic >> > ./deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 >> --topic myTopic --from-beginning >> >> I've seen cases where if the Kafka broker isn't shutdown properly, >> something like this happens. >> >> On Sun, Mar 22, 2015 at 10:35 AM, Ash W Matheson <ash.mathe...@gmail.com> >> wrote: >> >> > Hey all, >> > >> > Evaluating Samza currently and am running into some odd issues. >> > >> > I'm currently working off the 'hello-samza' repo and trying to parse a >> > simple kafka topic that I've produced through an extenal java app >> (nothing >> > other than a series of sentences) and it's failing pretty hard for me. >> The >> > base 'hello-samza' set of apps works fine, but as soon as I change the >> > configuration to look at a different Kafka/zookeeper I get the >> following in >> > the userlogs: >> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch last offsets >> > for streams [myTopic] due to kafka.common.KafkaException: fetching topic >> > metadata for topics [Set(myTopic)] from broker >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. Retrying. >> > >> > >> > The modifications are pretty straightforward. In the >> > Wikipedia-parser.properties, I've changed the following: >> > task.inputs=kafka.myTopic >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/ >> > systems.kafka.consumer.auto.offset.reset=smallest >> > systems.kafka.producer.metadata.broker.list=redacted:9092 >> > >> > and in the actual java file WikipediaParserStreamTask.java >> > public void process(IncomingMessageEnvelope envelope, MessageCollector >> > collector, TaskCoordinator coordinator) { >> > Map<String, Object> jsonObject = (Map<String, Object>) >> > envelope.getMessage(); >> > WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); >> > >> > try { >> > System.out.println(event.getRawEvent()); >> > >> > And then following the compile/extract/run process outlined in the >> > hello-samza website. >> > >> > Any thoughts? I've looked online for any 'super simple' examples of >> > ingesting kafka in samza with very little success. >> > >> >> >> >> -- >> Thanks and regards >> >> Chinmay Soman >> > >