Hello,

I am able to read messages of of a new kafka queue now.
The next task is to enrich the data with more information. The data that is
flowing in has ip address or host name. I have a redis cache where there is
more contextual information (like owner of the alert, SLA, etc). The data
in redis does not change often.
Pretty much becomes a stream table join.
I can also dump the same data to a different kafka queue and make it a
stream - stream join as well.

What do you guys recommend?

- Shekar

On Fri, Sep 5, 2014 at 4:08 PM, Chris Riccomini <
[email protected]> wrote:

> Hey Guys,
>
> I don't know a whole lot about Fluentd, but if you don't want to do this
> flow:
>
>   Fluentd -> Kafka -> Samza
>
> Then the alternative is:
>
>   Fluentd -> Samza
>
> The "direct" approach (no Kafka) is going to be pretty labor intensive to
> build. You'd have to:
>
> 1. Implement a FluentdSystemConsumer for Samza.
> 2. Write a Flutend data output plugin, which sends to the
> FluentdSystemConsumer.
> 3. Figure out a way for the Fluentd data output plugin to "discover" where
> the Samza FluentdSystemConsumer is located (since SamzaContainers are
> deployed to dynamic hosts in YARN, and move around a lot).
> 4. Implement a bare-bones FluentdSystemAdmin and FluentdSystemFactory
> class (similar to the WikipediaSystemFactory in hello-samza).
> 5. Decide on some partitioning model that makes sense for Fluentd. Maybe
> one partition = one host? Not sure how Fluentd works here.
>
> My instinct is that it's going to be *far* better to use the first
> approach (pipe the Fluentd events into Kafka). This will give you all of
> the semantics that Kafka provides (e.g. Ordering within a partition,
> rewinding streams, durability, etc).
>
> Cheers,
> Chris
>
> On 9/5/14 1:36 PM, "Yan Fang" <[email protected]> wrote:
>
> >also was thinking of having fluentd push to Samza. But don't know how to
> >implement this. Not sure if adding a kafka layer between Samza and fluentd
> >is the only option.
> >
> >Do other guys have better ideas?
> >
> >Thanks,
> >
> >Fang, Yan
> >[email protected]
> >+1 (206) 849-4108
> >
> >
> >On Fri, Sep 5, 2014 at 12:09 PM, Shekar Tippur <[email protected]> wrote:
> >
> >> Yan,
> >>
> >> Wont it add additional hop. It did occur to me earlier but was not sure
> >>if
> >> this is the right way to go if we have a stringent sla driven system
> >> depending on it.
> >>
> >> - Shekar
> >>
> >>
> >> On Fri, Sep 5, 2014 at 10:55 AM, Yan Fang <[email protected]> wrote:
> >>
> >> > If you already put the events to the kafka, you can make the Samza
> >> accepts
> >> > the kafka topic, like the wikipedia-parse project in hello-samza
> >>accepts
> >> > the kafka topic wikipedia-raw ( see the config file ).
> >> >
> >> > Thanks,
> >> >
> >> > Fang, Yan
> >> > [email protected]
> >> > +1 (206) 849-4108
> >> >
> >> >
> >> > On Fri, Sep 5, 2014 at 8:48 AM, Shekar Tippur <[email protected]>
> >>wrote:
> >> >
> >> > > Awesome .. This works. Thanks a lot.
> >> > >
> >> > > Now off to my next step.
> >> > > I want to point to an incoming stream of events. These events are
> >> routed
> >> > > via fluentd. So, fluentd acts as a routing layer where it pushes the
> >> > events
> >> > > to kafka. Since it is a push and not a pull, any pointers on how to
> >> push
> >> > it
> >> > > to samza? Guessing I need a listener on Samza to collect this?
> >> > >
> >> > > - Shekar
> >> > >
> >> > >
> >> > > On Fri, Sep 5, 2014 at 1:03 AM, Yan Fang <[email protected]>
> >>wrote:
> >> > >
> >> > > > Aha, yes, we are almost there. I think I made a mistake in the
> >> previous
> >> > > > email.
> >> > > >
> >> > > > 1. modify the *wikipedia-parser.properties ,  NOT
> >> > > > *wikipedia-feed.properties
> >> > > > 2. run deploy/samza/bin/run-job.sh
> >> > > >
> >> > >
> >> >
> >>
> >>--config-factory=org.apache.samza.config.factories.PropertiesConfigFactor
> >>y
> >> > > >
> >> > >
> >> >
> >>
> >>--config-path=file://$PWD/deploy/samza/config/*wikipedia-parser.propertie
> >>s*
> >> > > > *(NOT *wikipedia-feed,properties*)*
> >> > > >
> >> > > > Then you should see the messages in the kafka topic,
> >> *wikipedia-edits*
> >> > > >
> >> > > > Thanks. Let me know if you have any luck . :)
> >> > > >
> >> > > > Cheers,
> >> > > >
> >> > > > Fang, Yan
> >> > > > [email protected]
> >> > > > +1 (206) 849-4108
> >> > > >
> >> > > >
> >> > > > On Thu, Sep 4, 2014 at 11:19 PM, Shekar Tippur <[email protected]
> >
> >> > > wrote:
> >> > > >
> >> > > > > Just tried #3. Changed the property file
> >>wikipedia-feed.properties
> >> > > > >
> >> > > > > job.factory.class=org.apache.samza.job.local.LocalJobFactory
> >> > > > > Ran ..
> >> > > > >
> >> > > > > deploy/samza/bin/run-job.sh
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >>--config-factory=org.apache.samza.config.factories.PropertiesConfigFactor
> >>y
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >>--config-path=file:///home/ctippur/hello-samza/deploy/samza/config/wikipe
> >>dia-feed.properties
> >> > > > >
> >> > > > > I dont see any debug messages that I added to the feed or the
> >> parser
> >> > > > file..
> >> > > > > I see messages on the kafka-consumer ..
> >> > > > >
> >> > > > > However the feed job died with the below message
> >> > > > >
> >> > > > >
> >> > > > > Exception in thread "ThreadJob" java.lang.RuntimeException:
> >>Trying
> >> to
> >> > > > > unlisten to a channel that has no listeners in it.
> >> > > > >
> >> > > > > at
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >>samza.examples.wikipedia.system.WikipediaFeed.unlisten(WikipediaFeed.java
> >>:98)
> >> > > > >
> >> > > > > at
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >>samza.examples.wikipedia.system.WikipediaConsumer.stop(WikipediaConsumer.
> >>java:72)
> >> > > > >
> >> > > > > at
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemConsu
> >>mers.scala:152)
> >> > > > >
> >> > > > > at
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemConsu
> >>mers.scala:152)
> >> > > > >
> >> > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >> > > > >
> >> > > > > at
> >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >> > > > >
> >> > > > > at
> >> > > > >
> >> > >
> >>
> >>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> >> > > > >
> >> > > > > at
> >> > > >
> >> org.apache.samza.system.SystemConsumers.stop(SystemConsumers.scala:152)
> >> > > > >
> >> > > > > at
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >>org.apache.samza.container.SamzaContainer.shutdownConsumers(SamzaContaine
> >>r.scala:587)
> >> > > > >
> >> > > > > at
> >> > > >
> >> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:512)
> >> > > > >
> >> > > > >  at
> >> > > org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
> >> > > > >
> >> > > > > - Shekar
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
>
>

Reply via email to