Chris - Can you please elaborate on "Also note that if you take this approach, your state must be partitioned in the same way as your input stream."
- Shekar On Mon, Sep 8, 2014 at 10:29 AM, Chris Riccomini < [email protected]> wrote: > Hey Shekar, > > You can either run some external DB that holds the data set, and you can > query it from a StreamTask, or you can use Samza's state store feature to > push data into a stream that you can then store in a partitioned key-value > store along with your StreamTasks. There is some documentation here about > the state store approach: > > http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state > -management.html > > Putting your data into a Kafka stream is going to require more up front > effort from you, since you'll have to understand how Kafka's partitioning > model works, and setup some pipeline to push the updates for your state. > In the long run, I believe it's the better approach, though. Local lookups > on a key-value store should be faster than doing remote RPC calls to a DB > for every message. Also note that if you take this approach, your state > must be partitioned in the same way as your input stream. > > I'm sorry I can't give you a more definitive answer. It's really about > trade-offs. > > > Cheers, > Chris > > On 9/8/14 10:17 AM, "Shekar Tippur" <[email protected]> wrote: > > >Hello, > > > >I am able to read messages of of a new kafka queue now. > >The next task is to enrich the data with more information. The data that > >is > >flowing in has ip address or host name. I have a redis cache where there > >is > >more contextual information (like owner of the alert, SLA, etc). The data > >in redis does not change often. > >Pretty much becomes a stream table join. > >I can also dump the same data to a different kafka queue and make it a > >stream - stream join as well. > > > >What do you guys recommend? > > > >- Shekar > > > >On Fri, Sep 5, 2014 at 4:08 PM, Chris Riccomini < > >[email protected]> wrote: > > > >> Hey Guys, > >> > >> I don't know a whole lot about Fluentd, but if you don't want to do this > >> flow: > >> > >> Fluentd -> Kafka -> Samza > >> > >> Then the alternative is: > >> > >> Fluentd -> Samza > >> > >> The "direct" approach (no Kafka) is going to be pretty labor intensive > >>to > >> build. You'd have to: > >> > >> 1. Implement a FluentdSystemConsumer for Samza. > >> 2. Write a Flutend data output plugin, which sends to the > >> FluentdSystemConsumer. > >> 3. Figure out a way for the Fluentd data output plugin to "discover" > >>where > >> the Samza FluentdSystemConsumer is located (since SamzaContainers are > >> deployed to dynamic hosts in YARN, and move around a lot). > >> 4. Implement a bare-bones FluentdSystemAdmin and FluentdSystemFactory > >> class (similar to the WikipediaSystemFactory in hello-samza). > >> 5. Decide on some partitioning model that makes sense for Fluentd. Maybe > >> one partition = one host? Not sure how Fluentd works here. > >> > >> My instinct is that it's going to be *far* better to use the first > >> approach (pipe the Fluentd events into Kafka). This will give you all of > >> the semantics that Kafka provides (e.g. Ordering within a partition, > >> rewinding streams, durability, etc). > >> > >> Cheers, > >> Chris > >> > >> On 9/5/14 1:36 PM, "Yan Fang" <[email protected]> wrote: > >> > >> >also was thinking of having fluentd push to Samza. But don't know how > >>to > >> >implement this. Not sure if adding a kafka layer between Samza and > >>fluentd > >> >is the only option. > >> > > >> >Do other guys have better ideas? > >> > > >> >Thanks, > >> > > >> >Fang, Yan > >> >[email protected] > >> >+1 (206) 849-4108 > >> > > >> > > >> >On Fri, Sep 5, 2014 at 12:09 PM, Shekar Tippur <[email protected]> > >>wrote: > >> > > >> >> Yan, > >> >> > >> >> Wont it add additional hop. It did occur to me earlier but was not > >>sure > >> >>if > >> >> this is the right way to go if we have a stringent sla driven system > >> >> depending on it. > >> >> > >> >> - Shekar > >> >> > >> >> > >> >> On Fri, Sep 5, 2014 at 10:55 AM, Yan Fang <[email protected]> > >>wrote: > >> >> > >> >> > If you already put the events to the kafka, you can make the Samza > >> >> accepts > >> >> > the kafka topic, like the wikipedia-parse project in hello-samza > >> >>accepts > >> >> > the kafka topic wikipedia-raw ( see the config file ). > >> >> > > >> >> > Thanks, > >> >> > > >> >> > Fang, Yan > >> >> > [email protected] > >> >> > +1 (206) 849-4108 > >> >> > > >> >> > > >> >> > On Fri, Sep 5, 2014 at 8:48 AM, Shekar Tippur <[email protected]> > >> >>wrote: > >> >> > > >> >> > > Awesome .. This works. Thanks a lot. > >> >> > > > >> >> > > Now off to my next step. > >> >> > > I want to point to an incoming stream of events. These events are > >> >> routed > >> >> > > via fluentd. So, fluentd acts as a routing layer where it pushes > >>the > >> >> > events > >> >> > > to kafka. Since it is a push and not a pull, any pointers on how > >>to > >> >> push > >> >> > it > >> >> > > to samza? Guessing I need a listener on Samza to collect this? > >> >> > > > >> >> > > - Shekar > >> >> > > > >> >> > > > >> >> > > On Fri, Sep 5, 2014 at 1:03 AM, Yan Fang <[email protected]> > >> >>wrote: > >> >> > > > >> >> > > > Aha, yes, we are almost there. I think I made a mistake in the > >> >> previous > >> >> > > > email. > >> >> > > > > >> >> > > > 1. modify the *wikipedia-parser.properties , NOT > >> >> > > > *wikipedia-feed.properties > >> >> > > > 2. run deploy/samza/bin/run-job.sh > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > >>>>--config-factory=org.apache.samza.config.factories.PropertiesConfigFact > >>>>or > >> >>y > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > >>>>--config-path=file://$PWD/deploy/samza/config/*wikipedia-parser.propert > >>>>ie > >> >>s* > >> >> > > > *(NOT *wikipedia-feed,properties*)* > >> >> > > > > >> >> > > > Then you should see the messages in the kafka topic, > >> >> *wikipedia-edits* > >> >> > > > > >> >> > > > Thanks. Let me know if you have any luck . :) > >> >> > > > > >> >> > > > Cheers, > >> >> > > > > >> >> > > > Fang, Yan > >> >> > > > [email protected] > >> >> > > > +1 (206) 849-4108 > >> >> > > > > >> >> > > > > >> >> > > > On Thu, Sep 4, 2014 at 11:19 PM, Shekar Tippur > >><[email protected] > >> > > >> >> > > wrote: > >> >> > > > > >> >> > > > > Just tried #3. Changed the property file > >> >>wikipedia-feed.properties > >> >> > > > > > >> >> > > > > job.factory.class=org.apache.samza.job.local.LocalJobFactory > >> >> > > > > Ran .. > >> >> > > > > > >> >> > > > > deploy/samza/bin/run-job.sh > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > >>>>--config-factory=org.apache.samza.config.factories.PropertiesConfigFact > >>>>or > >> >>y > >> >> > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > >>>>--config-path=file:///home/ctippur/hello-samza/deploy/samza/config/wiki > >>>>pe > >> >>dia-feed.properties > >> >> > > > > > >> >> > > > > I dont see any debug messages that I added to the feed or the > >> >> parser > >> >> > > > file.. > >> >> > > > > I see messages on the kafka-consumer .. > >> >> > > > > > >> >> > > > > However the feed job died with the below message > >> >> > > > > > >> >> > > > > > >> >> > > > > Exception in thread "ThreadJob" java.lang.RuntimeException: > >> >>Trying > >> >> to > >> >> > > > > unlisten to a channel that has no listeners in it. > >> >> > > > > > >> >> > > > > at > >> >> > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > >>>>samza.examples.wikipedia.system.WikipediaFeed.unlisten(WikipediaFeed.ja > >>>>va > >> >>:98) > >> >> > > > > > >> >> > > > > at > >> >> > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > >>>>samza.examples.wikipedia.system.WikipediaConsumer.stop(WikipediaConsume > >>>>r. > >> >>java:72) > >> >> > > > > > >> >> > > > > at > >> >> > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > >>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemCon > >>>>su > >> >>mers.scala:152) > >> >> > > > > > >> >> > > > > at > >> >> > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > >>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemCon > >>>>su > >> >>mers.scala:152) > >> >> > > > > > >> >> > > > > at > >>scala.collection.Iterator$class.foreach(Iterator.scala:727) > >> >> > > > > > >> >> > > > > at > >> >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >> >> > > > > > >> >> > > > > at > >> >> > > > > > >> >> > > > >> >> > >> > >>>>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:20 > >>>>6) > >> >> > > > > > >> >> > > > > at > >> >> > > > > >> >> > >>org.apache.samza.system.SystemConsumers.stop(SystemConsumers.scala:152) > >> >> > > > > > >> >> > > > > at > >> >> > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > >>>>org.apache.samza.container.SamzaContainer.shutdownConsumers(SamzaContai > >>>>ne > >> >>r.scala:587) > >> >> > > > > > >> >> > > > > at > >> >> > > > > >> >> > >>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:512) > >> >> > > > > > >> >> > > > > at > >> >> > > > >>org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) > >> >> > > > > > >> >> > > > > - Shekar > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > >> > >
