Thanks a lot for the continued support. I meant to sent this email earlier.

I have the stream to table working well. Doing some performance testing and
building packages. I will fork and update github with my changes soon.

Thanks again.

- Shekar

On Tue, Sep 9, 2014 at 3:15 PM, Chris Riccomini <
[email protected]> wrote:

> Hey Shekar,
>
> Your understanding of the jobs is correct.
>
> The WikipediaFeedEvent class is just a convenience class so that
> StreamTask code doesn't have to deal with poking around inside the JSON
> structure. Instead, they get WikipediaFeedEvent events, which means the
> code can just call WikipediaFeedEvent.getChannel, etc.
>
> Cheers,
> Chris
>
> On 9/9/14 2:58 PM, "Shekar Tippur" <[email protected]> wrote:
>
> >More Questions ..
> >
> >Please correct me if I am wrong. As I am trying to unravel hello-samza,
> >
> >
> http://samza.incubator.apache.org/learn/tutorials/0.7.0/run-hello-samza-wi
> >thout-internet.html
> >
> >1. wikipedia-feed.properties - deploy a Samza job which listens to
> >wikipedia API, receives the feed in realtime and produces the feed to the
> >Kafka topic wikipedia-raw
> >2. wikipedia-parser - This pulls the messages from wikipedia-raw and
> >registers to an event that checks the integrity of the incoming message.
> >Why do we need to register this to an event?
> >
> >WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
> >
> >
> >- Shekar
> >
> >On Mon, Sep 8, 2014 at 10:54 AM, Chris Riccomini <
> >[email protected]> wrote:
> >
> >> Hey Shekar,
> >>
> >> Sure. If your input stream has 8 partitions and is partitioned by "ip
> >> address", then your state stream must also have 8 partitions and be
> >> partitioned by "ip address". This is to guarantee that the StreamTask
> >>that
> >> receives a message from the stream will have the state required to do
> >>the
> >> table join in its local store.
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 9/8/14 10:51 AM, "Shekar Tippur" <[email protected]> wrote:
> >>
> >> >Chris -
> >> >Can you please elaborate on
> >> >
> >> >"Also note that if you take this approach, your state
> >> >must be partitioned in the same way as your input stream."
> >> >
> >> >- Shekar
> >> >
> >> >On Mon, Sep 8, 2014 at 10:29 AM, Chris Riccomini <
> >> >[email protected]> wrote:
> >> >
> >> >> Hey Shekar,
> >> >>
> >> >> You can either run some external DB that holds the data set, and you
> >>can
> >> >> query it from a StreamTask, or you can use Samza's state store
> >>feature
> >> >>to
> >> >> push data into a stream that you can then store in a partitioned
> >> >>key-value
> >> >> store along with your StreamTasks. There is some documentation here
> >> >>about
> >> >> the state store approach:
> >> >>
> >> >>
> >> >>
> >>
> >>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/container/sta
> >> >>te
> >> >> -management.html
> >> >>
> >> >> Putting your data into a Kafka stream is going to require more up
> >>front
> >> >> effort from you, since you'll have to understand how Kafka's
> >> >>partitioning
> >> >> model works, and setup some pipeline to push the updates for your
> >>state.
> >> >> In the long run, I believe it's the better approach, though. Local
> >> >>lookups
> >> >> on a key-value store should be faster than doing remote RPC calls to
> >>a
> >> >>DB
> >> >> for every message. Also note that if you take this approach, your
> >>state
> >> >> must be partitioned in the same way as your input stream.
> >> >>
> >> >> I'm sorry I can't give you a more definitive answer. It's really
> >>about
> >> >> trade-offs.
> >> >>
> >> >>
> >> >> Cheers,
> >> >> Chris
> >> >>
> >> >> On 9/8/14 10:17 AM, "Shekar Tippur" <[email protected]> wrote:
> >> >>
> >> >> >Hello,
> >> >> >
> >> >> >I am able to read messages of of a new kafka queue now.
> >> >> >The next task is to enrich the data with more information. The data
> >> >>that
> >> >> >is
> >> >> >flowing in has ip address or host name. I have a redis cache where
> >> >>there
> >> >> >is
> >> >> >more contextual information (like owner of the alert, SLA, etc). The
> >> >>data
> >> >> >in redis does not change often.
> >> >> >Pretty much becomes a stream table join.
> >> >> >I can also dump the same data to a different kafka queue and make
> >>it a
> >> >> >stream - stream join as well.
> >> >> >
> >> >> >What do you guys recommend?
> >> >> >
> >> >> >- Shekar
> >> >> >
> >> >> >On Fri, Sep 5, 2014 at 4:08 PM, Chris Riccomini <
> >> >> >[email protected]> wrote:
> >> >> >
> >> >> >> Hey Guys,
> >> >> >>
> >> >> >> I don't know a whole lot about Fluentd, but if you don't want to
> >>do
> >> >>this
> >> >> >> flow:
> >> >> >>
> >> >> >>   Fluentd -> Kafka -> Samza
> >> >> >>
> >> >> >> Then the alternative is:
> >> >> >>
> >> >> >>   Fluentd -> Samza
> >> >> >>
> >> >> >> The "direct" approach (no Kafka) is going to be pretty labor
> >> >>intensive
> >> >> >>to
> >> >> >> build. You'd have to:
> >> >> >>
> >> >> >> 1. Implement a FluentdSystemConsumer for Samza.
> >> >> >> 2. Write a Flutend data output plugin, which sends to the
> >> >> >> FluentdSystemConsumer.
> >> >> >> 3. Figure out a way for the Fluentd data output plugin to
> >>"discover"
> >> >> >>where
> >> >> >> the Samza FluentdSystemConsumer is located (since SamzaContainers
> >>are
> >> >> >> deployed to dynamic hosts in YARN, and move around a lot).
> >> >> >> 4. Implement a bare-bones FluentdSystemAdmin and
> >>FluentdSystemFactory
> >> >> >> class (similar to the WikipediaSystemFactory in hello-samza).
> >> >> >> 5. Decide on some partitioning model that makes sense for Fluentd.
> >> >>Maybe
> >> >> >> one partition = one host? Not sure how Fluentd works here.
> >> >> >>
> >> >> >> My instinct is that it's going to be *far* better to use the first
> >> >> >> approach (pipe the Fluentd events into Kafka). This will give you
> >> >>all of
> >> >> >> the semantics that Kafka provides (e.g. Ordering within a
> >>partition,
> >> >> >> rewinding streams, durability, etc).
> >> >> >>
> >> >> >> Cheers,
> >> >> >> Chris
> >> >> >>
> >> >> >> On 9/5/14 1:36 PM, "Yan Fang" <[email protected]> wrote:
> >> >> >>
> >> >> >> >also was thinking of having fluentd push to Samza. But don't know
> >> >>how
> >> >> >>to
> >> >> >> >implement this. Not sure if adding a kafka layer between Samza
> >>and
> >> >> >>fluentd
> >> >> >> >is the only option.
> >> >> >> >
> >> >> >> >Do other guys have better ideas?
> >> >> >> >
> >> >> >> >Thanks,
> >> >> >> >
> >> >> >> >Fang, Yan
> >> >> >> >[email protected]
> >> >> >> >+1 (206) 849-4108
> >> >> >> >
> >> >> >> >
> >> >> >> >On Fri, Sep 5, 2014 at 12:09 PM, Shekar Tippur
> >><[email protected]>
> >> >> >>wrote:
> >> >> >> >
> >> >> >> >> Yan,
> >> >> >> >>
> >> >> >> >> Wont it add additional hop. It did occur to me earlier but was
> >>not
> >> >> >>sure
> >> >> >> >>if
> >> >> >> >> this is the right way to go if we have a stringent sla driven
> >> >>system
> >> >> >> >> depending on it.
> >> >> >> >>
> >> >> >> >> - Shekar
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> On Fri, Sep 5, 2014 at 10:55 AM, Yan Fang
> >><[email protected]>
> >> >> >>wrote:
> >> >> >> >>
> >> >> >> >> > If you already put the events to the kafka, you can make the
> >> >>Samza
> >> >> >> >> accepts
> >> >> >> >> > the kafka topic, like the wikipedia-parse project in
> >>hello-samza
> >> >> >> >>accepts
> >> >> >> >> > the kafka topic wikipedia-raw ( see the config file ).
> >> >> >> >> >
> >> >> >> >> > Thanks,
> >> >> >> >> >
> >> >> >> >> > Fang, Yan
> >> >> >> >> > [email protected]
> >> >> >> >> > +1 (206) 849-4108
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >> > On Fri, Sep 5, 2014 at 8:48 AM, Shekar Tippur
> >> >><[email protected]>
> >> >> >> >>wrote:
> >> >> >> >> >
> >> >> >> >> > > Awesome .. This works. Thanks a lot.
> >> >> >> >> > >
> >> >> >> >> > > Now off to my next step.
> >> >> >> >> > > I want to point to an incoming stream of events. These
> >>events
> >> >>are
> >> >> >> >> routed
> >> >> >> >> > > via fluentd. So, fluentd acts as a routing layer where it
> >> >>pushes
> >> >> >>the
> >> >> >> >> > events
> >> >> >> >> > > to kafka. Since it is a push and not a pull, any pointers
> >>on
> >> >>how
> >> >> >>to
> >> >> >> >> push
> >> >> >> >> > it
> >> >> >> >> > > to samza? Guessing I need a listener on Samza to collect
> >>this?
> >> >> >> >> > >
> >> >> >> >> > > - Shekar
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > > On Fri, Sep 5, 2014 at 1:03 AM, Yan Fang
> >> >><[email protected]>
> >> >> >> >>wrote:
> >> >> >> >> > >
> >> >> >> >> > > > Aha, yes, we are almost there. I think I made a mistake
> >>in
> >> >>the
> >> >> >> >> previous
> >> >> >> >> > > > email.
> >> >> >> >> > > >
> >> >> >> >> > > > 1. modify the *wikipedia-parser.properties ,  NOT
> >> >> >> >> > > > *wikipedia-feed.properties
> >> >> >> >> > > > 2. run deploy/samza/bin/run-job.sh
> >> >> >> >> > > >
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfig
> >>>>>>>>Fa
> >> >>>>>>ct
> >> >> >>>>or
> >> >> >> >>y
> >> >> >> >> > > >
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>>>>--config-path=file://$PWD/deploy/samza/config/*
> wikipedia-parser.pro
> >>>>>>>>pe
> >> >>>>>>rt
> >> >> >>>>ie
> >> >> >> >>s*
> >> >> >> >> > > > *(NOT *wikipedia-feed,properties*)*
> >> >> >> >> > > >
> >> >> >> >> > > > Then you should see the messages in the kafka topic,
> >> >> >> >> *wikipedia-edits*
> >> >> >> >> > > >
> >> >> >> >> > > > Thanks. Let me know if you have any luck . :)
> >> >> >> >> > > >
> >> >> >> >> > > > Cheers,
> >> >> >> >> > > >
> >> >> >> >> > > > Fang, Yan
> >> >> >> >> > > > [email protected]
> >> >> >> >> > > > +1 (206) 849-4108
> >> >> >> >> > > >
> >> >> >> >> > > >
> >> >> >> >> > > > On Thu, Sep 4, 2014 at 11:19 PM, Shekar Tippur
> >> >> >><[email protected]
> >> >> >> >
> >> >> >> >> > > wrote:
> >> >> >> >> > > >
> >> >> >> >> > > > > Just tried #3. Changed the property file
> >> >> >> >>wikipedia-feed.properties
> >> >> >> >> > > > >
> >> >> >> >> > > > >
> >> >>job.factory.class=org.apache.samza.job.local.LocalJobFactory
> >> >> >> >> > > > > Ran ..
> >> >> >> >> > > > >
> >> >> >> >> > > > > deploy/samza/bin/run-job.sh
> >> >> >> >> > > > >
> >> >> >> >> > > >
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfig
> >>>>>>>>Fa
> >> >>>>>>ct
> >> >> >>>>or
> >> >> >> >>y
> >> >> >> >> > > > >
> >> >> >> >> > > > >
> >> >> >> >> > > >
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>>>>--config-path=file:///home/ctippur/hello-samza/deploy/samza/config/
> >>>>>>>>wi
> >> >>>>>>ki
> >> >> >>>>pe
> >> >> >> >>dia-feed.properties
> >> >> >> >> > > > >
> >> >> >> >> > > > > I dont see any debug messages that I added to the feed
> >>or
> >> >>the
> >> >> >> >> parser
> >> >> >> >> > > > file..
> >> >> >> >> > > > > I see messages on the kafka-consumer ..
> >> >> >> >> > > > >
> >> >> >> >> > > > > However the feed job died with the below message
> >> >> >> >> > > > >
> >> >> >> >> > > > >
> >> >> >> >> > > > > Exception in thread "ThreadJob"
> >> >>java.lang.RuntimeException:
> >> >> >> >>Trying
> >> >> >> >> to
> >> >> >> >> > > > > unlisten to a channel that has no listeners in it.
> >> >> >> >> > > > >
> >> >> >> >> > > > > at
> >> >> >> >> > > > >
> >> >> >> >> > > > >
> >> >> >> >> > > >
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>>>>samza.examples.wikipedia.system.WikipediaFeed.unlisten(WikipediaFee
> >>>>>>>>d.
> >> >>>>>>ja
> >> >> >>>>va
> >> >> >> >>:98)
> >> >> >> >> > > > >
> >> >> >> >> > > > > at
> >> >> >> >> > > > >
> >> >> >> >> > > > >
> >> >> >> >> > > >
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>>>>samza.examples.wikipedia.system.WikipediaConsumer.stop(WikipediaCon
> >>>>>>>>su
> >> >>>>>>me
> >> >> >>>>r.
> >> >> >> >>java:72)
> >> >> >> >> > > > >
> >> >> >> >> > > > > at
> >> >> >> >> > > > >
> >> >> >> >> > > > >
> >> >> >> >> > > >
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(Syste
> >>>>>>>>mC
> >> >>>>>>on
> >> >> >>>>su
> >> >> >> >>mers.scala:152)
> >> >> >> >> > > > >
> >> >> >> >> > > > > at
> >> >> >> >> > > > >
> >> >> >> >> > > > >
> >> >> >> >> > > >
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(Syste
> >>>>>>>>mC
> >> >>>>>>on
> >> >> >>>>su
> >> >> >> >>mers.scala:152)
> >> >> >> >> > > > >
> >> >> >> >> > > > > at
> >> >> >>scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >> >> >> >> > > > >
> >> >> >> >> > > > > at
> >> >> >> >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >> >> >> >> > > > >
> >> >> >> >> > > > > at
> >> >> >> >> > > > >
> >> >> >> >> > >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>>>>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scal
> >>>>>>>>a:
> >> >>>>>>20
> >> >> >>>>6)
> >> >> >> >> > > > >
> >> >> >> >> > > > > at
> >> >> >> >> > > >
> >> >> >> >>
> >> >>
> >>
> >>>>>>org.apache.samza.system.SystemConsumers.stop(SystemConsumers.scala:15
> >>>>>>2)
> >> >> >> >> > > > >
> >> >> >> >> > > > > at
> >> >> >> >> > > > >
> >> >> >> >> > > > >
> >> >> >> >> > > >
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>>>>org.apache.samza.container.SamzaContainer.shutdownConsumers(SamzaCo
> >>>>>>>>nt
> >> >>>>>>ai
> >> >> >>>>ne
> >> >> >> >>r.scala:587)
> >> >> >> >> > > > >
> >> >> >> >> > > > > at
> >> >> >> >> > > >
> >> >> >> >>
> >> >>
> >>
> >>>>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:51
> >>>>>>2)
> >> >> >> >> > > > >
> >> >> >> >> > > > >  at
> >> >> >> >> > >
> >> >>
> >>>>org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
> >> >> >> >> > > > >
> >> >> >> >> > > > > - Shekar
> >> >> >> >> > > > >
> >> >> >> >> > > >
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >> >>
> >> >>
> >> >>
> >>
> >>
>
>

Reply via email to