Hey Shekar, Sure. If your input stream has 8 partitions and is partitioned by "ip address", then your state stream must also have 8 partitions and be partitioned by "ip address". This is to guarantee that the StreamTask that receives a message from the stream will have the state required to do the table join in its local store.
Cheers, Chris On 9/8/14 10:51 AM, "Shekar Tippur" <[email protected]> wrote: >Chris - >Can you please elaborate on > >"Also note that if you take this approach, your state >must be partitioned in the same way as your input stream." > >- Shekar > >On Mon, Sep 8, 2014 at 10:29 AM, Chris Riccomini < >[email protected]> wrote: > >> Hey Shekar, >> >> You can either run some external DB that holds the data set, and you can >> query it from a StreamTask, or you can use Samza's state store feature >>to >> push data into a stream that you can then store in a partitioned >>key-value >> store along with your StreamTasks. There is some documentation here >>about >> the state store approach: >> >> >>http://samza.incubator.apache.org/learn/documentation/0.7.0/container/sta >>te >> -management.html >> >> Putting your data into a Kafka stream is going to require more up front >> effort from you, since you'll have to understand how Kafka's >>partitioning >> model works, and setup some pipeline to push the updates for your state. >> In the long run, I believe it's the better approach, though. Local >>lookups >> on a key-value store should be faster than doing remote RPC calls to a >>DB >> for every message. Also note that if you take this approach, your state >> must be partitioned in the same way as your input stream. >> >> I'm sorry I can't give you a more definitive answer. It's really about >> trade-offs. >> >> >> Cheers, >> Chris >> >> On 9/8/14 10:17 AM, "Shekar Tippur" <[email protected]> wrote: >> >> >Hello, >> > >> >I am able to read messages of of a new kafka queue now. >> >The next task is to enrich the data with more information. The data >>that >> >is >> >flowing in has ip address or host name. I have a redis cache where >>there >> >is >> >more contextual information (like owner of the alert, SLA, etc). The >>data >> >in redis does not change often. >> >Pretty much becomes a stream table join. >> >I can also dump the same data to a different kafka queue and make it a >> >stream - stream join as well. >> > >> >What do you guys recommend? >> > >> >- Shekar >> > >> >On Fri, Sep 5, 2014 at 4:08 PM, Chris Riccomini < >> >[email protected]> wrote: >> > >> >> Hey Guys, >> >> >> >> I don't know a whole lot about Fluentd, but if you don't want to do >>this >> >> flow: >> >> >> >> Fluentd -> Kafka -> Samza >> >> >> >> Then the alternative is: >> >> >> >> Fluentd -> Samza >> >> >> >> The "direct" approach (no Kafka) is going to be pretty labor >>intensive >> >>to >> >> build. You'd have to: >> >> >> >> 1. Implement a FluentdSystemConsumer for Samza. >> >> 2. Write a Flutend data output plugin, which sends to the >> >> FluentdSystemConsumer. >> >> 3. Figure out a way for the Fluentd data output plugin to "discover" >> >>where >> >> the Samza FluentdSystemConsumer is located (since SamzaContainers are >> >> deployed to dynamic hosts in YARN, and move around a lot). >> >> 4. Implement a bare-bones FluentdSystemAdmin and FluentdSystemFactory >> >> class (similar to the WikipediaSystemFactory in hello-samza). >> >> 5. Decide on some partitioning model that makes sense for Fluentd. >>Maybe >> >> one partition = one host? Not sure how Fluentd works here. >> >> >> >> My instinct is that it's going to be *far* better to use the first >> >> approach (pipe the Fluentd events into Kafka). This will give you >>all of >> >> the semantics that Kafka provides (e.g. Ordering within a partition, >> >> rewinding streams, durability, etc). >> >> >> >> Cheers, >> >> Chris >> >> >> >> On 9/5/14 1:36 PM, "Yan Fang" <[email protected]> wrote: >> >> >> >> >also was thinking of having fluentd push to Samza. But don't know >>how >> >>to >> >> >implement this. Not sure if adding a kafka layer between Samza and >> >>fluentd >> >> >is the only option. >> >> > >> >> >Do other guys have better ideas? >> >> > >> >> >Thanks, >> >> > >> >> >Fang, Yan >> >> >[email protected] >> >> >+1 (206) 849-4108 >> >> > >> >> > >> >> >On Fri, Sep 5, 2014 at 12:09 PM, Shekar Tippur <[email protected]> >> >>wrote: >> >> > >> >> >> Yan, >> >> >> >> >> >> Wont it add additional hop. It did occur to me earlier but was not >> >>sure >> >> >>if >> >> >> this is the right way to go if we have a stringent sla driven >>system >> >> >> depending on it. >> >> >> >> >> >> - Shekar >> >> >> >> >> >> >> >> >> On Fri, Sep 5, 2014 at 10:55 AM, Yan Fang <[email protected]> >> >>wrote: >> >> >> >> >> >> > If you already put the events to the kafka, you can make the >>Samza >> >> >> accepts >> >> >> > the kafka topic, like the wikipedia-parse project in hello-samza >> >> >>accepts >> >> >> > the kafka topic wikipedia-raw ( see the config file ). >> >> >> > >> >> >> > Thanks, >> >> >> > >> >> >> > Fang, Yan >> >> >> > [email protected] >> >> >> > +1 (206) 849-4108 >> >> >> > >> >> >> > >> >> >> > On Fri, Sep 5, 2014 at 8:48 AM, Shekar Tippur >><[email protected]> >> >> >>wrote: >> >> >> > >> >> >> > > Awesome .. This works. Thanks a lot. >> >> >> > > >> >> >> > > Now off to my next step. >> >> >> > > I want to point to an incoming stream of events. These events >>are >> >> >> routed >> >> >> > > via fluentd. So, fluentd acts as a routing layer where it >>pushes >> >>the >> >> >> > events >> >> >> > > to kafka. Since it is a push and not a pull, any pointers on >>how >> >>to >> >> >> push >> >> >> > it >> >> >> > > to samza? Guessing I need a listener on Samza to collect this? >> >> >> > > >> >> >> > > - Shekar >> >> >> > > >> >> >> > > >> >> >> > > On Fri, Sep 5, 2014 at 1:03 AM, Yan Fang >><[email protected]> >> >> >>wrote: >> >> >> > > >> >> >> > > > Aha, yes, we are almost there. I think I made a mistake in >>the >> >> >> previous >> >> >> > > > email. >> >> >> > > > >> >> >> > > > 1. modify the *wikipedia-parser.properties , NOT >> >> >> > > > *wikipedia-feed.properties >> >> >> > > > 2. run deploy/samza/bin/run-job.sh >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> >> >> >> >>>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfigFa >>>>>>ct >> >>>>or >> >> >>y >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> >> >> >> >>>>>>--config-path=file://$PWD/deploy/samza/config/*wikipedia-parser.prope >>>>>>rt >> >>>>ie >> >> >>s* >> >> >> > > > *(NOT *wikipedia-feed,properties*)* >> >> >> > > > >> >> >> > > > Then you should see the messages in the kafka topic, >> >> >> *wikipedia-edits* >> >> >> > > > >> >> >> > > > Thanks. Let me know if you have any luck . :) >> >> >> > > > >> >> >> > > > Cheers, >> >> >> > > > >> >> >> > > > Fang, Yan >> >> >> > > > [email protected] >> >> >> > > > +1 (206) 849-4108 >> >> >> > > > >> >> >> > > > >> >> >> > > > On Thu, Sep 4, 2014 at 11:19 PM, Shekar Tippur >> >><[email protected] >> >> > >> >> >> > > wrote: >> >> >> > > > >> >> >> > > > > Just tried #3. Changed the property file >> >> >>wikipedia-feed.properties >> >> >> > > > > >> >> >> > > > > >>job.factory.class=org.apache.samza.job.local.LocalJobFactory >> >> >> > > > > Ran .. >> >> >> > > > > >> >> >> > > > > deploy/samza/bin/run-job.sh >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> >> >> >> >>>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfigFa >>>>>>ct >> >>>>or >> >> >>y >> >> >> > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> >> >> >> >>>>>>--config-path=file:///home/ctippur/hello-samza/deploy/samza/config/wi >>>>>>ki >> >>>>pe >> >> >>dia-feed.properties >> >> >> > > > > >> >> >> > > > > I dont see any debug messages that I added to the feed or >>the >> >> >> parser >> >> >> > > > file.. >> >> >> > > > > I see messages on the kafka-consumer .. >> >> >> > > > > >> >> >> > > > > However the feed job died with the below message >> >> >> > > > > >> >> >> > > > > >> >> >> > > > > Exception in thread "ThreadJob" >>java.lang.RuntimeException: >> >> >>Trying >> >> >> to >> >> >> > > > > unlisten to a channel that has no listeners in it. >> >> >> > > > > >> >> >> > > > > at >> >> >> > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> >> >> >> >>>>>>samza.examples.wikipedia.system.WikipediaFeed.unlisten(WikipediaFeed. >>>>>>ja >> >>>>va >> >> >>:98) >> >> >> > > > > >> >> >> > > > > at >> >> >> > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> >> >> >> >>>>>>samza.examples.wikipedia.system.WikipediaConsumer.stop(WikipediaConsu >>>>>>me >> >>>>r. >> >> >>java:72) >> >> >> > > > > >> >> >> > > > > at >> >> >> > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> >> >> >> >>>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemC >>>>>>on >> >>>>su >> >> >>mers.scala:152) >> >> >> > > > > >> >> >> > > > > at >> >> >> > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> >> >> >> >>>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemC >>>>>>on >> >>>>su >> >> >>mers.scala:152) >> >> >> > > > > >> >> >> > > > > at >> >>scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> >> > > > > >> >> >> > > > > at >> >> >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> >> > > > > >> >> >> > > > > at >> >> >> > > > > >> >> >> > > >> >> >> >> >> >> >>>>>>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala: >>>>>>20 >> >>>>6) >> >> >> > > > > >> >> >> > > > > at >> >> >> > > > >> >> >> >> >>>>org.apache.samza.system.SystemConsumers.stop(SystemConsumers.scala:152) >> >> >> > > > > >> >> >> > > > > at >> >> >> > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> >> >> >> >>>>>>org.apache.samza.container.SamzaContainer.shutdownConsumers(SamzaCont >>>>>>ai >> >>>>ne >> >> >>r.scala:587) >> >> >> > > > > >> >> >> > > > > at >> >> >> > > > >> >> >> >> >>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:512) >> >> >> > > > > >> >> >> > > > > at >> >> >> > > >> >>org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) >> >> >> > > > > >> >> >> > > > > - Shekar >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> >> >> >> >> >> >>
