Hello, I am able to read messages of of a new kafka queue now. The next task is to enrich the data with more information. The data that is flowing in has ip address or host name. I have a redis cache where there is more contextual information (like owner of the alert, SLA, etc). The data in redis does not change often. Pretty much becomes a stream table join. I can also dump the same data to a different kafka queue and make it a stream - stream join as well.
What do you guys recommend? - Shekar On Fri, Sep 5, 2014 at 4:08 PM, Chris Riccomini < [email protected]> wrote: > Hey Guys, > > I don't know a whole lot about Fluentd, but if you don't want to do this > flow: > > Fluentd -> Kafka -> Samza > > Then the alternative is: > > Fluentd -> Samza > > The "direct" approach (no Kafka) is going to be pretty labor intensive to > build. You'd have to: > > 1. Implement a FluentdSystemConsumer for Samza. > 2. Write a Flutend data output plugin, which sends to the > FluentdSystemConsumer. > 3. Figure out a way for the Fluentd data output plugin to "discover" where > the Samza FluentdSystemConsumer is located (since SamzaContainers are > deployed to dynamic hosts in YARN, and move around a lot). > 4. Implement a bare-bones FluentdSystemAdmin and FluentdSystemFactory > class (similar to the WikipediaSystemFactory in hello-samza). > 5. Decide on some partitioning model that makes sense for Fluentd. Maybe > one partition = one host? Not sure how Fluentd works here. > > My instinct is that it's going to be *far* better to use the first > approach (pipe the Fluentd events into Kafka). This will give you all of > the semantics that Kafka provides (e.g. Ordering within a partition, > rewinding streams, durability, etc). > > Cheers, > Chris > > On 9/5/14 1:36 PM, "Yan Fang" <[email protected]> wrote: > > >also was thinking of having fluentd push to Samza. But don't know how to > >implement this. Not sure if adding a kafka layer between Samza and fluentd > >is the only option. > > > >Do other guys have better ideas? > > > >Thanks, > > > >Fang, Yan > >[email protected] > >+1 (206) 849-4108 > > > > > >On Fri, Sep 5, 2014 at 12:09 PM, Shekar Tippur <[email protected]> wrote: > > > >> Yan, > >> > >> Wont it add additional hop. It did occur to me earlier but was not sure > >>if > >> this is the right way to go if we have a stringent sla driven system > >> depending on it. > >> > >> - Shekar > >> > >> > >> On Fri, Sep 5, 2014 at 10:55 AM, Yan Fang <[email protected]> wrote: > >> > >> > If you already put the events to the kafka, you can make the Samza > >> accepts > >> > the kafka topic, like the wikipedia-parse project in hello-samza > >>accepts > >> > the kafka topic wikipedia-raw ( see the config file ). > >> > > >> > Thanks, > >> > > >> > Fang, Yan > >> > [email protected] > >> > +1 (206) 849-4108 > >> > > >> > > >> > On Fri, Sep 5, 2014 at 8:48 AM, Shekar Tippur <[email protected]> > >>wrote: > >> > > >> > > Awesome .. This works. Thanks a lot. > >> > > > >> > > Now off to my next step. > >> > > I want to point to an incoming stream of events. These events are > >> routed > >> > > via fluentd. So, fluentd acts as a routing layer where it pushes the > >> > events > >> > > to kafka. Since it is a push and not a pull, any pointers on how to > >> push > >> > it > >> > > to samza? Guessing I need a listener on Samza to collect this? > >> > > > >> > > - Shekar > >> > > > >> > > > >> > > On Fri, Sep 5, 2014 at 1:03 AM, Yan Fang <[email protected]> > >>wrote: > >> > > > >> > > > Aha, yes, we are almost there. I think I made a mistake in the > >> previous > >> > > > email. > >> > > > > >> > > > 1. modify the *wikipedia-parser.properties , NOT > >> > > > *wikipedia-feed.properties > >> > > > 2. run deploy/samza/bin/run-job.sh > >> > > > > >> > > > >> > > >> > >>--config-factory=org.apache.samza.config.factories.PropertiesConfigFactor > >>y > >> > > > > >> > > > >> > > >> > >>--config-path=file://$PWD/deploy/samza/config/*wikipedia-parser.propertie > >>s* > >> > > > *(NOT *wikipedia-feed,properties*)* > >> > > > > >> > > > Then you should see the messages in the kafka topic, > >> *wikipedia-edits* > >> > > > > >> > > > Thanks. Let me know if you have any luck . :) > >> > > > > >> > > > Cheers, > >> > > > > >> > > > Fang, Yan > >> > > > [email protected] > >> > > > +1 (206) 849-4108 > >> > > > > >> > > > > >> > > > On Thu, Sep 4, 2014 at 11:19 PM, Shekar Tippur <[email protected] > > > >> > > wrote: > >> > > > > >> > > > > Just tried #3. Changed the property file > >>wikipedia-feed.properties > >> > > > > > >> > > > > job.factory.class=org.apache.samza.job.local.LocalJobFactory > >> > > > > Ran .. > >> > > > > > >> > > > > deploy/samza/bin/run-job.sh > >> > > > > > >> > > > > >> > > > >> > > >> > >>--config-factory=org.apache.samza.config.factories.PropertiesConfigFactor > >>y > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >>--config-path=file:///home/ctippur/hello-samza/deploy/samza/config/wikipe > >>dia-feed.properties > >> > > > > > >> > > > > I dont see any debug messages that I added to the feed or the > >> parser > >> > > > file.. > >> > > > > I see messages on the kafka-consumer .. > >> > > > > > >> > > > > However the feed job died with the below message > >> > > > > > >> > > > > > >> > > > > Exception in thread "ThreadJob" java.lang.RuntimeException: > >>Trying > >> to > >> > > > > unlisten to a channel that has no listeners in it. > >> > > > > > >> > > > > at > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >>samza.examples.wikipedia.system.WikipediaFeed.unlisten(WikipediaFeed.java > >>:98) > >> > > > > > >> > > > > at > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >>samza.examples.wikipedia.system.WikipediaConsumer.stop(WikipediaConsumer. > >>java:72) > >> > > > > > >> > > > > at > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemConsu > >>mers.scala:152) > >> > > > > > >> > > > > at > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemConsu > >>mers.scala:152) > >> > > > > > >> > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > >> > > > > > >> > > > > at > >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >> > > > > > >> > > > > at > >> > > > > > >> > > > >> > >>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > >> > > > > > >> > > > > at > >> > > > > >> org.apache.samza.system.SystemConsumers.stop(SystemConsumers.scala:152) > >> > > > > > >> > > > > at > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >>org.apache.samza.container.SamzaContainer.shutdownConsumers(SamzaContaine > >>r.scala:587) > >> > > > > > >> > > > > at > >> > > > > >> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:512) > >> > > > > > >> > > > > at > >> > > org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) > >> > > > > > >> > > > > - Shekar > >> > > > > > >> > > > > >> > > > >> > > >> > >
