Hey David,

Hmm, this is interesting. Some notes/questions:

1. How large is the per-partition state that we're talking about? We've
tested our RocksDB (0.8.0) implementation in excess of 80GB, and it
continues to perform at the same rate as smaller store sizes. The only
real trade-off appears to be restoration time, which seems to scale
linearly (2x the data takes 2x as long to restore). Depending on your
latency requirements, this may or may not be acceptable.
2. "And from my understanding of how Samza tasks work you can't consume
partition 0 from one topic and an arbitrary partition from another topic."
In 0.8.0, we allow pluggable partition assignment, using the
SystemStreamPartitionGrouper interface. There is still a limit that
prevents you from assigning the same partition to more than one task,
though. This is discussed in detail in this ticket:
https://issues.apache.org/jira/browse/SAMZA-353.
3. As Martin said, we ended up moving the YARN NMs off of the Kafka broker
nodes at LinkedIn due to page cache contention between the Kafka broker
and stateful Samza jobs. We actually published an engineering blog post
today (http://engineering.linkedin.com/samza/operating-apache-samza-scale)
that documents how we run Samza. Kafka brokers rely on page cache as their
"cache", so taking page cache away to use for stateful stream processors
can impact the latency and throughput of the brokers (they start hitting
disk more).

In the past, we've been able to side skirt these kinds of problems by
either repartitioning the data, or remotely querying a system and caching
the responses locally (to avoid a DDOS, and maintain high throughput).

Cheers,
Chris

On 11/19/14 9:18 AM, "David Pick" <[email protected]> wrote:

>Hey Milinda,
>
>Unfortunately, not all of our tables have the same join attribute to
>partition on. As an example we have several tables that join to our
>customers table through another table and since our producer is stateless
>we have no way of knowing which customer that row was for.
>
>Customers => {:id int}
>Customer_Transactions => {:customer_id int, :transaction_id int}
>Transactions => {:id int}
>
>Given a schema like the one above, we could partition both the customers
>table and the customer_transactions table on the customer_id, but we
>couldn't partition the transactions table the same way. And from my
>understanding of how Samza tasks work you can't consume partition 0 from
>one topic and an arbitrary partition from another topic.
>
>David
>
>On Wed, Nov 19, 2014 at 11:04 AM, Milinda Pathirage
><[email protected]>
>wrote:
>
>> Hi David,
>>
>> One way to have more control over number of partitions is to partition
>> based on combination of shard identifier + join attribute. This will
>>give
>> you more control over number of partitions and you can change the
>>number of
>> partition based on availability of resources and amount of data process
>>at
>> a single node. Also this won't effect the ordering.
>>
>> Thanks
>> Milinda
>>
>> On Wed, Nov 19, 2014 at 10:50 AM, David Pick <[email protected]>
>>wrote:
>>
>> > Hey Martin and Milinda,
>> >
>> > Thanks for the quick replies!
>> >
>> > Our primary database is Postgres which we've sharded. To get data to
>> Kafka
>> > we use a tool called PGQ
>>(https://wiki.postgresql.org/wiki/PGQ_Tutorial)
>> > which is just a simple queueing system built inside of Postgres. So we
>> have
>> > a database trigger that pushes any state change into PGQ. Then a small
>> > Clojure script picks up all the changes from the database and pushes
>>them
>> > into Kafka where there is a single topic called the datastream. The
>> > datastream topic has a partition for each shard of our database so
>>that
>> we
>> > can ensure messages from each individual Postgres instance come out of
>> > Kafka in the order we're expecting.
>> >
>> > If my understanding is correct it seems that with our partitioning
>>scheme
>> > we would have a Samza task for each partition of our datastream task
>>for
>> a
>> > job that was generating data for something like our search platform.
>>But
>> > given the amount of data we have, our LevelDB instance would get
>> > significantly larger than a few gig. Is there a better way to
>>partition
>> the
>> > data that would keep those ordering guarantees?
>> >
>> > For my example we'd like to publish a feed of all posts with their
>> > associated user information merged onto the document.
>> >
>> > As for why we're storing the merged document, we certainly don't have
>> to. I
>> > was just going based on the example on the state management page (
>> >
>> >
>> 
>>http://samza.incubator.apache.org/learn/documentation/0.7.0/container/sta
>>te-management.html
>> > )
>> > which says:
>> >
>> > *Example: Join a table of user profiles to a table of user settings by
>> > user_id and emit the joined stream*
>> >
>> > Implementation: The job subscribes to the change streams for the user
>> > profiles database and the user settings database, both partitioned by
>> > user_id. The job keeps a key-value store keyed by user_id, which
>>contains
>> > the latest profile record and the latest settings record for each
>> user_id.
>> > When a new event comes in from either stream, the job looks up the
>> current
>> > value in its store, updates the appropriate fields (depending on
>>whether
>> it
>> > was a profile update or a settings update), and writes back the new
>> joined
>> > record to the store. The changelog of the store doubles as the output
>> > stream of the task.
>> >
>> >
>> > Thanks,
>> >
>> > David
>> >
>> > On Wed, Nov 19, 2014 at 9:10 AM, Milinda Pathirage <
>> [email protected]>
>> > wrote:
>> >
>> > > Hi David,
>> > >
>> > > I am also wondering why you are storing merged document in local
>> LevelDB.
>> > > If you need to check for duplicates, how about using a bloom filter
>>to
>> > > handle duplicates.
>> > >
>> > > Thanks
>> > > Milinda
>> > >
>> > > On Wed, Nov 19, 2014 at 8:50 AM, Martin Kleppmann <
>> [email protected]>
>> > > wrote:
>> > >
>> > > > Hi David,
>> > > >
>> > > > On 19 Nov 2014, at 04:52, David Pick <[email protected]> wrote:
>> > > > > First off, if this is the wrong place to ask these kinds of
>> questions
>> > > > > please let me know. I tried in IRC but didn't get an answer
>>within
>> a
>> > > few
>> > > > > hours so I'm trying here.
>> > > >
>> > > > It's the right place, and they're good questions :)
>> > > >
>> > > > > I had a couple of questions around implementing a table to table
>> join
>> > > > with
>> > > > > data coming from a database changelog through Kafka.
>> > > >
>> > > > Cool! Out of curiosity, can I ask what database you're using and
>>how
>> > > > you're getting the changelog?
>> > > >
>> > > > > Let's say I've got two tables users and posts in my primary db
>> where
>> > > the
>> > > > > posts table has a user_id column. I've written a Samza job that
>> joins
>> > > > those
>> > > > > two tables together by storing every user record and the merged
>> > > document
>> > > > in
>> > > > > Leveldb and then outputing the resulting document to the
>>changelog
>> > > Kafka
>> > > > > topic.
>> > > >
>> > > > Not sure exactly what you mean with a merged document here. Is it
>>a
>> > list
>> > > > of all the posts by a particular user? Or post records augmented
>>with
>> > > user
>> > > > information? Or something else?
>> > > >
>> > > > > Is this the right way to implement that kind of job?
>> > > >
>> > > > On a high level, this sounds reasonable. One detail question is
>> whether
>> > > > you're using the changelog feature for the LevelDB store. If the
>> > contents
>> > > > of the store can be rebuilt by reprocessing the input stream, you
>> could
>> > > get
>> > > > away with turning off the changelog on the store, and making the
>> input
>> > > > stream a bootstrap stream instead. That will save some overhead on
>> > > writes,
>> > > > but still give you fault tolerance.
>> > > >
>> > > > Another question is whether the values you're storing in LevelDB
>>are
>> > > > several merged records together, or whether you store each record
>> > > > individually and use a range query to retrieve them if necessary.
>>You
>> > > could
>> > > > benchmark which works best for you.
>> > > >
>> > > > > It seems that even
>> > > > > with a decent partitioning scheme the leveldb instance in each
>>task
>> > > will
>> > > > > get quite large, especially if we're joining several tables
>> together
>> > > that
>> > > > > have millions of rows (our real world use case would be 7 or 8
>> tables
>> > > > each
>> > > > > with many millions of records).
>> > > >
>> > > > The goal of Samza's local key-value stores is to support a few
>> > gigabytes
>> > > > per partition. So millions of rows distributed across (say) tens
>>of
>> > > > partitions should be fine, assuming the rows aren't huge.  You
>>might
>> > want
>> > > > to try the new RocksDB support, which may perform better than
>> LevelDB.
>> > > >
>> > > > > Also, given a task that's processing that much data where do you
>> > > > > recommended running Samza? Should we spin up another set of
>>boxes
>> or
>> > is
>> > > > it
>> > > > > ok to run it on the Kafka brokers (I heard it mentioned that
>>this
>> is
>> > > how
>> > > > > LinkedIn is running Samza).
>> > > >
>> > > > IIRC LinkedIn saw some issues with the page cache when Kafka and
>> > LevelDB
>> > > > were on the same machine, which was resolved by putting them on
>> > different
>> > > > machines. But I'll let one of the LinkedIn folks comment on that.
>> > > >
>> > > > Best,
>> > > > Martin
>> > > >
>> > > >
>> > >
>> > >
>> > > --
>> > > Milinda Pathirage
>> > >
>> > > PhD Student | Research Assistant
>> > > School of Informatics and Computing | Data to Insight Center
>> > > Indiana University
>> > >
>> > > twitter: milindalakmal
>> > > skype: milinda.pathirage
>> > > blog: http://milinda.pathirage.org
>> > >
>> >
>>
>>
>>
>> --
>> Milinda Pathirage
>>
>> PhD Student | Research Assistant
>> School of Informatics and Computing | Data to Insight Center
>> Indiana University
>>
>> twitter: milindalakmal
>> skype: milinda.pathirage
>> blog: http://milinda.pathirage.org
>>

Reply via email to