Hey Martin and Milinda,

Thanks for the quick replies!

Our primary database is Postgres which we've sharded. To get data to Kafka
we use a tool called PGQ (https://wiki.postgresql.org/wiki/PGQ_Tutorial)
which is just a simple queueing system built inside of Postgres. So we have
a database trigger that pushes any state change into PGQ. Then a small
Clojure script picks up all the changes from the database and pushes them
into Kafka where there is a single topic called the datastream. The
datastream topic has a partition for each shard of our database so that we
can ensure messages from each individual Postgres instance come out of
Kafka in the order we're expecting.

If my understanding is correct it seems that with our partitioning scheme
we would have a Samza task for each partition of our datastream task for a
job that was generating data for something like our search platform. But
given the amount of data we have, our LevelDB instance would get
significantly larger than a few gig. Is there a better way to partition the
data that would keep those ordering guarantees?

For my example we'd like to publish a feed of all posts with their
associated user information merged onto the document.

As for why we're storing the merged document, we certainly don't have to. I
was just going based on the example on the state management page (
http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html)
which says:

*Example: Join a table of user profiles to a table of user settings by
user_id and emit the joined stream*

Implementation: The job subscribes to the change streams for the user
profiles database and the user settings database, both partitioned by
user_id. The job keeps a key-value store keyed by user_id, which contains
the latest profile record and the latest settings record for each user_id.
When a new event comes in from either stream, the job looks up the current
value in its store, updates the appropriate fields (depending on whether it
was a profile update or a settings update), and writes back the new joined
record to the store. The changelog of the store doubles as the output
stream of the task.


Thanks,

David

On Wed, Nov 19, 2014 at 9:10 AM, Milinda Pathirage <[email protected]>
wrote:

> Hi David,
>
> I am also wondering why you are storing merged document in local LevelDB.
> If you need to check for duplicates, how about using a bloom filter to
> handle duplicates.
>
> Thanks
> Milinda
>
> On Wed, Nov 19, 2014 at 8:50 AM, Martin Kleppmann <[email protected]>
> wrote:
>
> > Hi David,
> >
> > On 19 Nov 2014, at 04:52, David Pick <[email protected]> wrote:
> > > First off, if this is the wrong place to ask these kinds of questions
> > > please let me know. I tried in IRC but didn't get an answer within a
> few
> > > hours so I'm trying here.
> >
> > It's the right place, and they're good questions :)
> >
> > > I had a couple of questions around implementing a table to table join
> > with
> > > data coming from a database changelog through Kafka.
> >
> > Cool! Out of curiosity, can I ask what database you're using and how
> > you're getting the changelog?
> >
> > > Let's say I've got two tables users and posts in my primary db where
> the
> > > posts table has a user_id column. I've written a Samza job that joins
> > those
> > > two tables together by storing every user record and the merged
> document
> > in
> > > Leveldb and then outputing the resulting document to the changelog
> Kafka
> > > topic.
> >
> > Not sure exactly what you mean with a merged document here. Is it a list
> > of all the posts by a particular user? Or post records augmented with
> user
> > information? Or something else?
> >
> > > Is this the right way to implement that kind of job?
> >
> > On a high level, this sounds reasonable. One detail question is whether
> > you're using the changelog feature for the LevelDB store. If the contents
> > of the store can be rebuilt by reprocessing the input stream, you could
> get
> > away with turning off the changelog on the store, and making the input
> > stream a bootstrap stream instead. That will save some overhead on
> writes,
> > but still give you fault tolerance.
> >
> > Another question is whether the values you're storing in LevelDB are
> > several merged records together, or whether you store each record
> > individually and use a range query to retrieve them if necessary. You
> could
> > benchmark which works best for you.
> >
> > > It seems that even
> > > with a decent partitioning scheme the leveldb instance in each task
> will
> > > get quite large, especially if we're joining several tables together
> that
> > > have millions of rows (our real world use case would be 7 or 8 tables
> > each
> > > with many millions of records).
> >
> > The goal of Samza's local key-value stores is to support a few gigabytes
> > per partition. So millions of rows distributed across (say) tens of
> > partitions should be fine, assuming the rows aren't huge.  You might want
> > to try the new RocksDB support, which may perform better than LevelDB.
> >
> > > Also, given a task that's processing that much data where do you
> > > recommended running Samza? Should we spin up another set of boxes or is
> > it
> > > ok to run it on the Kafka brokers (I heard it mentioned that this is
> how
> > > LinkedIn is running Samza).
> >
> > IIRC LinkedIn saw some issues with the page cache when Kafka and LevelDB
> > were on the same machine, which was resolved by putting them on different
> > machines. But I'll let one of the LinkedIn folks comment on that.
> >
> > Best,
> > Martin
> >
> >
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Reply via email to