More Questions .. Please correct me if I am wrong. As I am trying to unravel hello-samza,
http://samza.incubator.apache.org/learn/tutorials/0.7.0/run-hello-samza-without-internet.html 1. wikipedia-feed.properties - deploy a Samza job which listens to wikipedia API, receives the feed in realtime and produces the feed to the Kafka topic wikipedia-raw 2. wikipedia-parser - This pulls the messages from wikipedia-raw and registers to an event that checks the integrity of the incoming message. Why do we need to register this to an event? WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); - Shekar On Mon, Sep 8, 2014 at 10:54 AM, Chris Riccomini < [email protected]> wrote: > Hey Shekar, > > Sure. If your input stream has 8 partitions and is partitioned by "ip > address", then your state stream must also have 8 partitions and be > partitioned by "ip address". This is to guarantee that the StreamTask that > receives a message from the stream will have the state required to do the > table join in its local store. > > Cheers, > Chris > > On 9/8/14 10:51 AM, "Shekar Tippur" <[email protected]> wrote: > > >Chris - > >Can you please elaborate on > > > >"Also note that if you take this approach, your state > >must be partitioned in the same way as your input stream." > > > >- Shekar > > > >On Mon, Sep 8, 2014 at 10:29 AM, Chris Riccomini < > >[email protected]> wrote: > > > >> Hey Shekar, > >> > >> You can either run some external DB that holds the data set, and you can > >> query it from a StreamTask, or you can use Samza's state store feature > >>to > >> push data into a stream that you can then store in a partitioned > >>key-value > >> store along with your StreamTasks. There is some documentation here > >>about > >> the state store approach: > >> > >> > >> > http://samza.incubator.apache.org/learn/documentation/0.7.0/container/sta > >>te > >> -management.html > >> > >> Putting your data into a Kafka stream is going to require more up front > >> effort from you, since you'll have to understand how Kafka's > >>partitioning > >> model works, and setup some pipeline to push the updates for your state. > >> In the long run, I believe it's the better approach, though. Local > >>lookups > >> on a key-value store should be faster than doing remote RPC calls to a > >>DB > >> for every message. Also note that if you take this approach, your state > >> must be partitioned in the same way as your input stream. > >> > >> I'm sorry I can't give you a more definitive answer. It's really about > >> trade-offs. > >> > >> > >> Cheers, > >> Chris > >> > >> On 9/8/14 10:17 AM, "Shekar Tippur" <[email protected]> wrote: > >> > >> >Hello, > >> > > >> >I am able to read messages of of a new kafka queue now. > >> >The next task is to enrich the data with more information. The data > >>that > >> >is > >> >flowing in has ip address or host name. I have a redis cache where > >>there > >> >is > >> >more contextual information (like owner of the alert, SLA, etc). The > >>data > >> >in redis does not change often. > >> >Pretty much becomes a stream table join. > >> >I can also dump the same data to a different kafka queue and make it a > >> >stream - stream join as well. > >> > > >> >What do you guys recommend? > >> > > >> >- Shekar > >> > > >> >On Fri, Sep 5, 2014 at 4:08 PM, Chris Riccomini < > >> >[email protected]> wrote: > >> > > >> >> Hey Guys, > >> >> > >> >> I don't know a whole lot about Fluentd, but if you don't want to do > >>this > >> >> flow: > >> >> > >> >> Fluentd -> Kafka -> Samza > >> >> > >> >> Then the alternative is: > >> >> > >> >> Fluentd -> Samza > >> >> > >> >> The "direct" approach (no Kafka) is going to be pretty labor > >>intensive > >> >>to > >> >> build. You'd have to: > >> >> > >> >> 1. Implement a FluentdSystemConsumer for Samza. > >> >> 2. Write a Flutend data output plugin, which sends to the > >> >> FluentdSystemConsumer. > >> >> 3. Figure out a way for the Fluentd data output plugin to "discover" > >> >>where > >> >> the Samza FluentdSystemConsumer is located (since SamzaContainers are > >> >> deployed to dynamic hosts in YARN, and move around a lot). > >> >> 4. Implement a bare-bones FluentdSystemAdmin and FluentdSystemFactory > >> >> class (similar to the WikipediaSystemFactory in hello-samza). > >> >> 5. Decide on some partitioning model that makes sense for Fluentd. > >>Maybe > >> >> one partition = one host? Not sure how Fluentd works here. > >> >> > >> >> My instinct is that it's going to be *far* better to use the first > >> >> approach (pipe the Fluentd events into Kafka). This will give you > >>all of > >> >> the semantics that Kafka provides (e.g. Ordering within a partition, > >> >> rewinding streams, durability, etc). > >> >> > >> >> Cheers, > >> >> Chris > >> >> > >> >> On 9/5/14 1:36 PM, "Yan Fang" <[email protected]> wrote: > >> >> > >> >> >also was thinking of having fluentd push to Samza. But don't know > >>how > >> >>to > >> >> >implement this. Not sure if adding a kafka layer between Samza and > >> >>fluentd > >> >> >is the only option. > >> >> > > >> >> >Do other guys have better ideas? > >> >> > > >> >> >Thanks, > >> >> > > >> >> >Fang, Yan > >> >> >[email protected] > >> >> >+1 (206) 849-4108 > >> >> > > >> >> > > >> >> >On Fri, Sep 5, 2014 at 12:09 PM, Shekar Tippur <[email protected]> > >> >>wrote: > >> >> > > >> >> >> Yan, > >> >> >> > >> >> >> Wont it add additional hop. It did occur to me earlier but was not > >> >>sure > >> >> >>if > >> >> >> this is the right way to go if we have a stringent sla driven > >>system > >> >> >> depending on it. > >> >> >> > >> >> >> - Shekar > >> >> >> > >> >> >> > >> >> >> On Fri, Sep 5, 2014 at 10:55 AM, Yan Fang <[email protected]> > >> >>wrote: > >> >> >> > >> >> >> > If you already put the events to the kafka, you can make the > >>Samza > >> >> >> accepts > >> >> >> > the kafka topic, like the wikipedia-parse project in hello-samza > >> >> >>accepts > >> >> >> > the kafka topic wikipedia-raw ( see the config file ). > >> >> >> > > >> >> >> > Thanks, > >> >> >> > > >> >> >> > Fang, Yan > >> >> >> > [email protected] > >> >> >> > +1 (206) 849-4108 > >> >> >> > > >> >> >> > > >> >> >> > On Fri, Sep 5, 2014 at 8:48 AM, Shekar Tippur > >><[email protected]> > >> >> >>wrote: > >> >> >> > > >> >> >> > > Awesome .. This works. Thanks a lot. > >> >> >> > > > >> >> >> > > Now off to my next step. > >> >> >> > > I want to point to an incoming stream of events. These events > >>are > >> >> >> routed > >> >> >> > > via fluentd. So, fluentd acts as a routing layer where it > >>pushes > >> >>the > >> >> >> > events > >> >> >> > > to kafka. Since it is a push and not a pull, any pointers on > >>how > >> >>to > >> >> >> push > >> >> >> > it > >> >> >> > > to samza? Guessing I need a listener on Samza to collect this? > >> >> >> > > > >> >> >> > > - Shekar > >> >> >> > > > >> >> >> > > > >> >> >> > > On Fri, Sep 5, 2014 at 1:03 AM, Yan Fang > >><[email protected]> > >> >> >>wrote: > >> >> >> > > > >> >> >> > > > Aha, yes, we are almost there. I think I made a mistake in > >>the > >> >> >> previous > >> >> >> > > > email. > >> >> >> > > > > >> >> >> > > > 1. modify the *wikipedia-parser.properties , NOT > >> >> >> > > > *wikipedia-feed.properties > >> >> >> > > > 2. run deploy/samza/bin/run-job.sh > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > >>>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfigFa > >>>>>>ct > >> >>>>or > >> >> >>y > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > >>>>>>--config-path=file://$PWD/deploy/samza/config/*wikipedia-parser.prope > >>>>>>rt > >> >>>>ie > >> >> >>s* > >> >> >> > > > *(NOT *wikipedia-feed,properties*)* > >> >> >> > > > > >> >> >> > > > Then you should see the messages in the kafka topic, > >> >> >> *wikipedia-edits* > >> >> >> > > > > >> >> >> > > > Thanks. Let me know if you have any luck . :) > >> >> >> > > > > >> >> >> > > > Cheers, > >> >> >> > > > > >> >> >> > > > Fang, Yan > >> >> >> > > > [email protected] > >> >> >> > > > +1 (206) 849-4108 > >> >> >> > > > > >> >> >> > > > > >> >> >> > > > On Thu, Sep 4, 2014 at 11:19 PM, Shekar Tippur > >> >><[email protected] > >> >> > > >> >> >> > > wrote: > >> >> >> > > > > >> >> >> > > > > Just tried #3. Changed the property file > >> >> >>wikipedia-feed.properties > >> >> >> > > > > > >> >> >> > > > > > >>job.factory.class=org.apache.samza.job.local.LocalJobFactory > >> >> >> > > > > Ran .. > >> >> >> > > > > > >> >> >> > > > > deploy/samza/bin/run-job.sh > >> >> >> > > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > >>>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfigFa > >>>>>>ct > >> >>>>or > >> >> >>y > >> >> >> > > > > > >> >> >> > > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > >>>>>>--config-path=file:///home/ctippur/hello-samza/deploy/samza/config/wi > >>>>>>ki > >> >>>>pe > >> >> >>dia-feed.properties > >> >> >> > > > > > >> >> >> > > > > I dont see any debug messages that I added to the feed or > >>the > >> >> >> parser > >> >> >> > > > file.. > >> >> >> > > > > I see messages on the kafka-consumer .. > >> >> >> > > > > > >> >> >> > > > > However the feed job died with the below message > >> >> >> > > > > > >> >> >> > > > > > >> >> >> > > > > Exception in thread "ThreadJob" > >>java.lang.RuntimeException: > >> >> >>Trying > >> >> >> to > >> >> >> > > > > unlisten to a channel that has no listeners in it. > >> >> >> > > > > > >> >> >> > > > > at > >> >> >> > > > > > >> >> >> > > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > >>>>>>samza.examples.wikipedia.system.WikipediaFeed.unlisten(WikipediaFeed. > >>>>>>ja > >> >>>>va > >> >> >>:98) > >> >> >> > > > > > >> >> >> > > > > at > >> >> >> > > > > > >> >> >> > > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > >>>>>>samza.examples.wikipedia.system.WikipediaConsumer.stop(WikipediaConsu > >>>>>>me > >> >>>>r. > >> >> >>java:72) > >> >> >> > > > > > >> >> >> > > > > at > >> >> >> > > > > > >> >> >> > > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > >>>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemC > >>>>>>on > >> >>>>su > >> >> >>mers.scala:152) > >> >> >> > > > > > >> >> >> > > > > at > >> >> >> > > > > > >> >> >> > > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > >>>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemC > >>>>>>on > >> >>>>su > >> >> >>mers.scala:152) > >> >> >> > > > > > >> >> >> > > > > at > >> >>scala.collection.Iterator$class.foreach(Iterator.scala:727) > >> >> >> > > > > > >> >> >> > > > > at > >> >> >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >> >> >> > > > > > >> >> >> > > > > at > >> >> >> > > > > > >> >> >> > > > >> >> >> > >> >> > >> > >>>>>>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala: > >>>>>>20 > >> >>>>6) > >> >> >> > > > > > >> >> >> > > > > at > >> >> >> > > > > >> >> >> > >> > >>>>org.apache.samza.system.SystemConsumers.stop(SystemConsumers.scala:152) > >> >> >> > > > > > >> >> >> > > > > at > >> >> >> > > > > > >> >> >> > > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> > >>>>>>org.apache.samza.container.SamzaContainer.shutdownConsumers(SamzaCont > >>>>>>ai > >> >>>>ne > >> >> >>r.scala:587) > >> >> >> > > > > > >> >> >> > > > > at > >> >> >> > > > > >> >> >> > >> > >>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:512) > >> >> >> > > > > > >> >> >> > > > > at > >> >> >> > > > >> >>org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) > >> >> >> > > > > > >> >> >> > > > > - Shekar > >> >> >> > > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> > >> >> > >> > >> > >
