Hey Shekar,

You can either run some external DB that holds the data set, and you can
query it from a StreamTask, or you can use Samza's state store feature to
push data into a stream that you can then store in a partitioned key-value
store along with your StreamTasks. There is some documentation here about
the state store approach:

http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state
-management.html

Putting your data into a Kafka stream is going to require more up front
effort from you, since you'll have to understand how Kafka's partitioning
model works, and setup some pipeline to push the updates for your state.
In the long run, I believe it's the better approach, though. Local lookups
on a key-value store should be faster than doing remote RPC calls to a DB
for every message. Also note that if you take this approach, your state
must be partitioned in the same way as your input stream.

I'm sorry I can't give you a more definitive answer. It's really about
trade-offs.


Cheers,
Chris

On 9/8/14 10:17 AM, "Shekar Tippur" <[email protected]> wrote:

>Hello,
>
>I am able to read messages of of a new kafka queue now.
>The next task is to enrich the data with more information. The data that
>is
>flowing in has ip address or host name. I have a redis cache where there
>is
>more contextual information (like owner of the alert, SLA, etc). The data
>in redis does not change often.
>Pretty much becomes a stream table join.
>I can also dump the same data to a different kafka queue and make it a
>stream - stream join as well.
>
>What do you guys recommend?
>
>- Shekar
>
>On Fri, Sep 5, 2014 at 4:08 PM, Chris Riccomini <
>[email protected]> wrote:
>
>> Hey Guys,
>>
>> I don't know a whole lot about Fluentd, but if you don't want to do this
>> flow:
>>
>>   Fluentd -> Kafka -> Samza
>>
>> Then the alternative is:
>>
>>   Fluentd -> Samza
>>
>> The "direct" approach (no Kafka) is going to be pretty labor intensive
>>to
>> build. You'd have to:
>>
>> 1. Implement a FluentdSystemConsumer for Samza.
>> 2. Write a Flutend data output plugin, which sends to the
>> FluentdSystemConsumer.
>> 3. Figure out a way for the Fluentd data output plugin to "discover"
>>where
>> the Samza FluentdSystemConsumer is located (since SamzaContainers are
>> deployed to dynamic hosts in YARN, and move around a lot).
>> 4. Implement a bare-bones FluentdSystemAdmin and FluentdSystemFactory
>> class (similar to the WikipediaSystemFactory in hello-samza).
>> 5. Decide on some partitioning model that makes sense for Fluentd. Maybe
>> one partition = one host? Not sure how Fluentd works here.
>>
>> My instinct is that it's going to be *far* better to use the first
>> approach (pipe the Fluentd events into Kafka). This will give you all of
>> the semantics that Kafka provides (e.g. Ordering within a partition,
>> rewinding streams, durability, etc).
>>
>> Cheers,
>> Chris
>>
>> On 9/5/14 1:36 PM, "Yan Fang" <[email protected]> wrote:
>>
>> >also was thinking of having fluentd push to Samza. But don't know how
>>to
>> >implement this. Not sure if adding a kafka layer between Samza and
>>fluentd
>> >is the only option.
>> >
>> >Do other guys have better ideas?
>> >
>> >Thanks,
>> >
>> >Fang, Yan
>> >[email protected]
>> >+1 (206) 849-4108
>> >
>> >
>> >On Fri, Sep 5, 2014 at 12:09 PM, Shekar Tippur <[email protected]>
>>wrote:
>> >
>> >> Yan,
>> >>
>> >> Wont it add additional hop. It did occur to me earlier but was not
>>sure
>> >>if
>> >> this is the right way to go if we have a stringent sla driven system
>> >> depending on it.
>> >>
>> >> - Shekar
>> >>
>> >>
>> >> On Fri, Sep 5, 2014 at 10:55 AM, Yan Fang <[email protected]>
>>wrote:
>> >>
>> >> > If you already put the events to the kafka, you can make the Samza
>> >> accepts
>> >> > the kafka topic, like the wikipedia-parse project in hello-samza
>> >>accepts
>> >> > the kafka topic wikipedia-raw ( see the config file ).
>> >> >
>> >> > Thanks,
>> >> >
>> >> > Fang, Yan
>> >> > [email protected]
>> >> > +1 (206) 849-4108
>> >> >
>> >> >
>> >> > On Fri, Sep 5, 2014 at 8:48 AM, Shekar Tippur <[email protected]>
>> >>wrote:
>> >> >
>> >> > > Awesome .. This works. Thanks a lot.
>> >> > >
>> >> > > Now off to my next step.
>> >> > > I want to point to an incoming stream of events. These events are
>> >> routed
>> >> > > via fluentd. So, fluentd acts as a routing layer where it pushes
>>the
>> >> > events
>> >> > > to kafka. Since it is a push and not a pull, any pointers on how
>>to
>> >> push
>> >> > it
>> >> > > to samza? Guessing I need a listener on Samza to collect this?
>> >> > >
>> >> > > - Shekar
>> >> > >
>> >> > >
>> >> > > On Fri, Sep 5, 2014 at 1:03 AM, Yan Fang <[email protected]>
>> >>wrote:
>> >> > >
>> >> > > > Aha, yes, we are almost there. I think I made a mistake in the
>> >> previous
>> >> > > > email.
>> >> > > >
>> >> > > > 1. modify the *wikipedia-parser.properties ,  NOT
>> >> > > > *wikipedia-feed.properties
>> >> > > > 2. run deploy/samza/bin/run-job.sh
>> >> > > >
>> >> > >
>> >> >
>> >>
>> 
>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfigFact
>>>>or
>> >>y
>> >> > > >
>> >> > >
>> >> >
>> >>
>> 
>>>>--config-path=file://$PWD/deploy/samza/config/*wikipedia-parser.propert
>>>>ie
>> >>s*
>> >> > > > *(NOT *wikipedia-feed,properties*)*
>> >> > > >
>> >> > > > Then you should see the messages in the kafka topic,
>> >> *wikipedia-edits*
>> >> > > >
>> >> > > > Thanks. Let me know if you have any luck . :)
>> >> > > >
>> >> > > > Cheers,
>> >> > > >
>> >> > > > Fang, Yan
>> >> > > > [email protected]
>> >> > > > +1 (206) 849-4108
>> >> > > >
>> >> > > >
>> >> > > > On Thu, Sep 4, 2014 at 11:19 PM, Shekar Tippur
>><[email protected]
>> >
>> >> > > wrote:
>> >> > > >
>> >> > > > > Just tried #3. Changed the property file
>> >>wikipedia-feed.properties
>> >> > > > >
>> >> > > > > job.factory.class=org.apache.samza.job.local.LocalJobFactory
>> >> > > > > Ran ..
>> >> > > > >
>> >> > > > > deploy/samza/bin/run-job.sh
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> 
>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfigFact
>>>>or
>> >>y
>> >> > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> 
>>>>--config-path=file:///home/ctippur/hello-samza/deploy/samza/config/wiki
>>>>pe
>> >>dia-feed.properties
>> >> > > > >
>> >> > > > > I dont see any debug messages that I added to the feed or the
>> >> parser
>> >> > > > file..
>> >> > > > > I see messages on the kafka-consumer ..
>> >> > > > >
>> >> > > > > However the feed job died with the below message
>> >> > > > >
>> >> > > > >
>> >> > > > > Exception in thread "ThreadJob" java.lang.RuntimeException:
>> >>Trying
>> >> to
>> >> > > > > unlisten to a channel that has no listeners in it.
>> >> > > > >
>> >> > > > > at
>> >> > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> 
>>>>samza.examples.wikipedia.system.WikipediaFeed.unlisten(WikipediaFeed.ja
>>>>va
>> >>:98)
>> >> > > > >
>> >> > > > > at
>> >> > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> 
>>>>samza.examples.wikipedia.system.WikipediaConsumer.stop(WikipediaConsume
>>>>r.
>> >>java:72)
>> >> > > > >
>> >> > > > > at
>> >> > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> 
>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemCon
>>>>su
>> >>mers.scala:152)
>> >> > > > >
>> >> > > > > at
>> >> > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> 
>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemCon
>>>>su
>> >>mers.scala:152)
>> >> > > > >
>> >> > > > > at 
>>scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >> > > > >
>> >> > > > > at
>> >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >> > > > >
>> >> > > > > at
>> >> > > > >
>> >> > >
>> >>
>> 
>>>>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:20
>>>>6)
>> >> > > > >
>> >> > > > > at
>> >> > > >
>> >> 
>>org.apache.samza.system.SystemConsumers.stop(SystemConsumers.scala:152)
>> >> > > > >
>> >> > > > > at
>> >> > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> 
>>>>org.apache.samza.container.SamzaContainer.shutdownConsumers(SamzaContai
>>>>ne
>> >>r.scala:587)
>> >> > > > >
>> >> > > > > at
>> >> > > >
>> >> 
>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:512)
>> >> > > > >
>> >> > > > >  at
>> >> > > 
>>org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
>> >> > > > >
>> >> > > > > - Shekar
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>>
>>

Reply via email to