Thanks a lot for the continued support. I meant to sent this email earlier.
I have the stream to table working well. Doing some performance testing and building packages. I will fork and update github with my changes soon. Thanks again. - Shekar On Tue, Sep 9, 2014 at 3:15 PM, Chris Riccomini < [email protected]> wrote: > Hey Shekar, > > Your understanding of the jobs is correct. > > The WikipediaFeedEvent class is just a convenience class so that > StreamTask code doesn't have to deal with poking around inside the JSON > structure. Instead, they get WikipediaFeedEvent events, which means the > code can just call WikipediaFeedEvent.getChannel, etc. > > Cheers, > Chris > > On 9/9/14 2:58 PM, "Shekar Tippur" <[email protected]> wrote: > > >More Questions .. > > > >Please correct me if I am wrong. As I am trying to unravel hello-samza, > > > > > http://samza.incubator.apache.org/learn/tutorials/0.7.0/run-hello-samza-wi > >thout-internet.html > > > >1. wikipedia-feed.properties - deploy a Samza job which listens to > >wikipedia API, receives the feed in realtime and produces the feed to the > >Kafka topic wikipedia-raw > >2. wikipedia-parser - This pulls the messages from wikipedia-raw and > >registers to an event that checks the integrity of the incoming message. > >Why do we need to register this to an event? > > > >WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject); > > > > > >- Shekar > > > >On Mon, Sep 8, 2014 at 10:54 AM, Chris Riccomini < > >[email protected]> wrote: > > > >> Hey Shekar, > >> > >> Sure. If your input stream has 8 partitions and is partitioned by "ip > >> address", then your state stream must also have 8 partitions and be > >> partitioned by "ip address". This is to guarantee that the StreamTask > >>that > >> receives a message from the stream will have the state required to do > >>the > >> table join in its local store. > >> > >> Cheers, > >> Chris > >> > >> On 9/8/14 10:51 AM, "Shekar Tippur" <[email protected]> wrote: > >> > >> >Chris - > >> >Can you please elaborate on > >> > > >> >"Also note that if you take this approach, your state > >> >must be partitioned in the same way as your input stream." > >> > > >> >- Shekar > >> > > >> >On Mon, Sep 8, 2014 at 10:29 AM, Chris Riccomini < > >> >[email protected]> wrote: > >> > > >> >> Hey Shekar, > >> >> > >> >> You can either run some external DB that holds the data set, and you > >>can > >> >> query it from a StreamTask, or you can use Samza's state store > >>feature > >> >>to > >> >> push data into a stream that you can then store in a partitioned > >> >>key-value > >> >> store along with your StreamTasks. There is some documentation here > >> >>about > >> >> the state store approach: > >> >> > >> >> > >> >> > >> > >> > http://samza.incubator.apache.org/learn/documentation/0.7.0/container/sta > >> >>te > >> >> -management.html > >> >> > >> >> Putting your data into a Kafka stream is going to require more up > >>front > >> >> effort from you, since you'll have to understand how Kafka's > >> >>partitioning > >> >> model works, and setup some pipeline to push the updates for your > >>state. > >> >> In the long run, I believe it's the better approach, though. Local > >> >>lookups > >> >> on a key-value store should be faster than doing remote RPC calls to > >>a > >> >>DB > >> >> for every message. Also note that if you take this approach, your > >>state > >> >> must be partitioned in the same way as your input stream. > >> >> > >> >> I'm sorry I can't give you a more definitive answer. It's really > >>about > >> >> trade-offs. > >> >> > >> >> > >> >> Cheers, > >> >> Chris > >> >> > >> >> On 9/8/14 10:17 AM, "Shekar Tippur" <[email protected]> wrote: > >> >> > >> >> >Hello, > >> >> > > >> >> >I am able to read messages of of a new kafka queue now. > >> >> >The next task is to enrich the data with more information. The data > >> >>that > >> >> >is > >> >> >flowing in has ip address or host name. I have a redis cache where > >> >>there > >> >> >is > >> >> >more contextual information (like owner of the alert, SLA, etc). The > >> >>data > >> >> >in redis does not change often. > >> >> >Pretty much becomes a stream table join. > >> >> >I can also dump the same data to a different kafka queue and make > >>it a > >> >> >stream - stream join as well. > >> >> > > >> >> >What do you guys recommend? > >> >> > > >> >> >- Shekar > >> >> > > >> >> >On Fri, Sep 5, 2014 at 4:08 PM, Chris Riccomini < > >> >> >[email protected]> wrote: > >> >> > > >> >> >> Hey Guys, > >> >> >> > >> >> >> I don't know a whole lot about Fluentd, but if you don't want to > >>do > >> >>this > >> >> >> flow: > >> >> >> > >> >> >> Fluentd -> Kafka -> Samza > >> >> >> > >> >> >> Then the alternative is: > >> >> >> > >> >> >> Fluentd -> Samza > >> >> >> > >> >> >> The "direct" approach (no Kafka) is going to be pretty labor > >> >>intensive > >> >> >>to > >> >> >> build. You'd have to: > >> >> >> > >> >> >> 1. Implement a FluentdSystemConsumer for Samza. > >> >> >> 2. Write a Flutend data output plugin, which sends to the > >> >> >> FluentdSystemConsumer. > >> >> >> 3. Figure out a way for the Fluentd data output plugin to > >>"discover" > >> >> >>where > >> >> >> the Samza FluentdSystemConsumer is located (since SamzaContainers > >>are > >> >> >> deployed to dynamic hosts in YARN, and move around a lot). > >> >> >> 4. Implement a bare-bones FluentdSystemAdmin and > >>FluentdSystemFactory > >> >> >> class (similar to the WikipediaSystemFactory in hello-samza). > >> >> >> 5. Decide on some partitioning model that makes sense for Fluentd. > >> >>Maybe > >> >> >> one partition = one host? Not sure how Fluentd works here. > >> >> >> > >> >> >> My instinct is that it's going to be *far* better to use the first > >> >> >> approach (pipe the Fluentd events into Kafka). This will give you > >> >>all of > >> >> >> the semantics that Kafka provides (e.g. Ordering within a > >>partition, > >> >> >> rewinding streams, durability, etc). > >> >> >> > >> >> >> Cheers, > >> >> >> Chris > >> >> >> > >> >> >> On 9/5/14 1:36 PM, "Yan Fang" <[email protected]> wrote: > >> >> >> > >> >> >> >also was thinking of having fluentd push to Samza. But don't know > >> >>how > >> >> >>to > >> >> >> >implement this. Not sure if adding a kafka layer between Samza > >>and > >> >> >>fluentd > >> >> >> >is the only option. > >> >> >> > > >> >> >> >Do other guys have better ideas? > >> >> >> > > >> >> >> >Thanks, > >> >> >> > > >> >> >> >Fang, Yan > >> >> >> >[email protected] > >> >> >> >+1 (206) 849-4108 > >> >> >> > > >> >> >> > > >> >> >> >On Fri, Sep 5, 2014 at 12:09 PM, Shekar Tippur > >><[email protected]> > >> >> >>wrote: > >> >> >> > > >> >> >> >> Yan, > >> >> >> >> > >> >> >> >> Wont it add additional hop. It did occur to me earlier but was > >>not > >> >> >>sure > >> >> >> >>if > >> >> >> >> this is the right way to go if we have a stringent sla driven > >> >>system > >> >> >> >> depending on it. > >> >> >> >> > >> >> >> >> - Shekar > >> >> >> >> > >> >> >> >> > >> >> >> >> On Fri, Sep 5, 2014 at 10:55 AM, Yan Fang > >><[email protected]> > >> >> >>wrote: > >> >> >> >> > >> >> >> >> > If you already put the events to the kafka, you can make the > >> >>Samza > >> >> >> >> accepts > >> >> >> >> > the kafka topic, like the wikipedia-parse project in > >>hello-samza > >> >> >> >>accepts > >> >> >> >> > the kafka topic wikipedia-raw ( see the config file ). > >> >> >> >> > > >> >> >> >> > Thanks, > >> >> >> >> > > >> >> >> >> > Fang, Yan > >> >> >> >> > [email protected] > >> >> >> >> > +1 (206) 849-4108 > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > On Fri, Sep 5, 2014 at 8:48 AM, Shekar Tippur > >> >><[email protected]> > >> >> >> >>wrote: > >> >> >> >> > > >> >> >> >> > > Awesome .. This works. Thanks a lot. > >> >> >> >> > > > >> >> >> >> > > Now off to my next step. > >> >> >> >> > > I want to point to an incoming stream of events. These > >>events > >> >>are > >> >> >> >> routed > >> >> >> >> > > via fluentd. So, fluentd acts as a routing layer where it > >> >>pushes > >> >> >>the > >> >> >> >> > events > >> >> >> >> > > to kafka. Since it is a push and not a pull, any pointers > >>on > >> >>how > >> >> >>to > >> >> >> >> push > >> >> >> >> > it > >> >> >> >> > > to samza? Guessing I need a listener on Samza to collect > >>this? > >> >> >> >> > > > >> >> >> >> > > - Shekar > >> >> >> >> > > > >> >> >> >> > > > >> >> >> >> > > On Fri, Sep 5, 2014 at 1:03 AM, Yan Fang > >> >><[email protected]> > >> >> >> >>wrote: > >> >> >> >> > > > >> >> >> >> > > > Aha, yes, we are almost there. I think I made a mistake > >>in > >> >>the > >> >> >> >> previous > >> >> >> >> > > > email. > >> >> >> >> > > > > >> >> >> >> > > > 1. modify the *wikipedia-parser.properties , NOT > >> >> >> >> > > > *wikipedia-feed.properties > >> >> >> >> > > > 2. run deploy/samza/bin/run-job.sh > >> >> >> >> > > > > >> >> >> >> > > > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> > >> > >>>>>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfig > >>>>>>>>Fa > >> >>>>>>ct > >> >> >>>>or > >> >> >> >>y > >> >> >> >> > > > > >> >> >> >> > > > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> > >> > >>>>>>>>--config-path=file://$PWD/deploy/samza/config/* > wikipedia-parser.pro > >>>>>>>>pe > >> >>>>>>rt > >> >> >>>>ie > >> >> >> >>s* > >> >> >> >> > > > *(NOT *wikipedia-feed,properties*)* > >> >> >> >> > > > > >> >> >> >> > > > Then you should see the messages in the kafka topic, > >> >> >> >> *wikipedia-edits* > >> >> >> >> > > > > >> >> >> >> > > > Thanks. Let me know if you have any luck . :) > >> >> >> >> > > > > >> >> >> >> > > > Cheers, > >> >> >> >> > > > > >> >> >> >> > > > Fang, Yan > >> >> >> >> > > > [email protected] > >> >> >> >> > > > +1 (206) 849-4108 > >> >> >> >> > > > > >> >> >> >> > > > > >> >> >> >> > > > On Thu, Sep 4, 2014 at 11:19 PM, Shekar Tippur > >> >> >><[email protected] > >> >> >> > > >> >> >> >> > > wrote: > >> >> >> >> > > > > >> >> >> >> > > > > Just tried #3. Changed the property file > >> >> >> >>wikipedia-feed.properties > >> >> >> >> > > > > > >> >> >> >> > > > > > >> >>job.factory.class=org.apache.samza.job.local.LocalJobFactory > >> >> >> >> > > > > Ran .. > >> >> >> >> > > > > > >> >> >> >> > > > > deploy/samza/bin/run-job.sh > >> >> >> >> > > > > > >> >> >> >> > > > > >> >> >> >> > > > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> > >> > >>>>>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfig > >>>>>>>>Fa > >> >>>>>>ct > >> >> >>>>or > >> >> >> >>y > >> >> >> >> > > > > > >> >> >> >> > > > > > >> >> >> >> > > > > >> >> >> >> > > > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> > >> > >>>>>>>>--config-path=file:///home/ctippur/hello-samza/deploy/samza/config/ > >>>>>>>>wi > >> >>>>>>ki > >> >> >>>>pe > >> >> >> >>dia-feed.properties > >> >> >> >> > > > > > >> >> >> >> > > > > I dont see any debug messages that I added to the feed > >>or > >> >>the > >> >> >> >> parser > >> >> >> >> > > > file.. > >> >> >> >> > > > > I see messages on the kafka-consumer .. > >> >> >> >> > > > > > >> >> >> >> > > > > However the feed job died with the below message > >> >> >> >> > > > > > >> >> >> >> > > > > > >> >> >> >> > > > > Exception in thread "ThreadJob" > >> >>java.lang.RuntimeException: > >> >> >> >>Trying > >> >> >> >> to > >> >> >> >> > > > > unlisten to a channel that has no listeners in it. > >> >> >> >> > > > > > >> >> >> >> > > > > at > >> >> >> >> > > > > > >> >> >> >> > > > > > >> >> >> >> > > > > >> >> >> >> > > > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> > >> > >>>>>>>>samza.examples.wikipedia.system.WikipediaFeed.unlisten(WikipediaFee > >>>>>>>>d. > >> >>>>>>ja > >> >> >>>>va > >> >> >> >>:98) > >> >> >> >> > > > > > >> >> >> >> > > > > at > >> >> >> >> > > > > > >> >> >> >> > > > > > >> >> >> >> > > > > >> >> >> >> > > > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> > >> > >>>>>>>>samza.examples.wikipedia.system.WikipediaConsumer.stop(WikipediaCon > >>>>>>>>su > >> >>>>>>me > >> >> >>>>r. > >> >> >> >>java:72) > >> >> >> >> > > > > > >> >> >> >> > > > > at > >> >> >> >> > > > > > >> >> >> >> > > > > > >> >> >> >> > > > > >> >> >> >> > > > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> > >> > >>>>>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(Syste > >>>>>>>>mC > >> >>>>>>on > >> >> >>>>su > >> >> >> >>mers.scala:152) > >> >> >> >> > > > > > >> >> >> >> > > > > at > >> >> >> >> > > > > > >> >> >> >> > > > > > >> >> >> >> > > > > >> >> >> >> > > > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> > >> > >>>>>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(Syste > >>>>>>>>mC > >> >>>>>>on > >> >> >>>>su > >> >> >> >>mers.scala:152) > >> >> >> >> > > > > > >> >> >> >> > > > > at > >> >> >>scala.collection.Iterator$class.foreach(Iterator.scala:727) > >> >> >> >> > > > > > >> >> >> >> > > > > at > >> >> >> >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > >> >> >> >> > > > > > >> >> >> >> > > > > at > >> >> >> >> > > > > > >> >> >> >> > > > >> >> >> >> > >> >> >> > >> >> > >> > >>>>>>>>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scal > >>>>>>>>a: > >> >>>>>>20 > >> >> >>>>6) > >> >> >> >> > > > > > >> >> >> >> > > > > at > >> >> >> >> > > > > >> >> >> >> > >> >> > >> > >>>>>>org.apache.samza.system.SystemConsumers.stop(SystemConsumers.scala:15 > >>>>>>2) > >> >> >> >> > > > > > >> >> >> >> > > > > at > >> >> >> >> > > > > > >> >> >> >> > > > > > >> >> >> >> > > > > >> >> >> >> > > > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> > >> > >>>>>>>>org.apache.samza.container.SamzaContainer.shutdownConsumers(SamzaCo > >>>>>>>>nt > >> >>>>>>ai > >> >> >>>>ne > >> >> >> >>r.scala:587) > >> >> >> >> > > > > > >> >> >> >> > > > > at > >> >> >> >> > > > > >> >> >> >> > >> >> > >> > >>>>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:51 > >>>>>>2) > >> >> >> >> > > > > > >> >> >> >> > > > > at > >> >> >> >> > > > >> >> > >>>>org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42) > >> >> >> >> > > > > > >> >> >> >> > > > > - Shekar > >> >> >> >> > > > > > >> >> >> >> > > > > >> >> >> >> > > > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> >> > >> >> > >> >> > >> > >> > >
