My 2 cents => One thing to note about the push model : multi-tenancy When your storage system (Druid for example) is used in a multi-tenant fashion - then push model is a bit difficult to operate. Primarily because there is no real feedback loop from the storage system. Yes - if the storage system starts doing bad - then you get timeouts and higher latencies - but then you're already in a position where you're probably breaking SLAs (for some tenant).
In that sense, a pull model might be better since the consumer can potentially have more visibility into how this particular node is doing. Also, with the Kafka consumer batches things up - so theoretically - you could get similar throughput. Downside of this approach is of course - the storage system partitioning scheme *has to* line up with the Kafka partitioning scheme. On Thu, Apr 2, 2015 at 11:41 AM, Roger Hoover <[email protected]> wrote: > Felix, > > I see your point about simple Kafka consumers. My thought was that if > you're already managing a Samza/YARN deployment then these types of jobs > would be "just another job" and not require an additional process > management/monitoring/operations setup. If you've already got a way to > handle vanilla Kafka jobs then it makes sense. > > For the push model, the way we're planning to deal with the latency of > round-trip calls is to batch up pushs to the downstream system. Both Druid > Tranquility and the ES transport node protocol allow you to batch index > requests. I'm curious if pull would be that much more efficient. > > Cheers, > > Roger > > On Wed, Apr 1, 2015 at 10:26 AM, Felix GV <[email protected]> > wrote: > > > Hi Roger, > > > > You bring up good points, and I think the short answer is that there are > > trade-offs to everything, of course (: > > > > What I described could definitely be implemented as a Samza job, and I > > think that would make a lot of sense if the data serving system was also > > deployed via YARN. This way, the Samza tasks responsible for ingesting > and > > populating the data serving system's nodes could be spawned wherever YARN > > knows these nodes are located. For data serving systems not well > integrated > > with YARN however, I'm not sure that there would be that much win in > using > > the Samza deployment model. And since the consumers themselves are pretty > > simple (no joining of streams, no local state, etc.), this seems to be a > > case where Samza is a bit overkill and a regular Kafka consumer is > > perfectly fine (except for the YARN-enabled auto-deployment aspect, like > I > > mentioned). > > > > As for push versus pull, I think the trade-off is the following: push is > > mostly simpler and more decoupled, as you said, but I think pull would be > > more efficient. The reason for that is that Kafka consumption is very > > efficient (thanks to batching and compression), but most data serving > > systems don't provide a streaming ingest API for pushing data efficiently > > to them, instead they have single record put/insert APIs which require a > > round-trip to be acknowledged. This is perfectly fine in low-throughput > > scenarios, but does not support very high throughput of ingestion like > > Kafka can provide. By co-locating the pulling process (i.e.: Kafka > > consumer) with the data serving node, it makes it a bit more affordable > to > > do single puts since the (local) round-trip acks would be > > near-instantaneous. Pulling also makes the tracking of offsets across > > different nodes a bit easier, since each node can consume at its own > pace, > > and resume at whatever point in the past it needs (i.e.: rewind) without > > affecting the other replicas. Tracking offsets across many replicas in > the > > push model is a bit more annoying, though still doable, of course. > > > > -- > > > > Felix GV > > Data Infrastructure Engineer > > Distributed Data Systems > > LinkedIn > > > > [email protected] > > linkedin.com/in/felixgv > > > > ________________________________________ > > From: Roger Hoover [[email protected]] > > Sent: Tuesday, March 31, 2015 8:57 PM > > To: [email protected] > > Subject: Re: How do you serve the data computed by Samza? > > > > Ah, thanks for the great explanation. Any particular reason that the > > job(s) you described should not be Samza jobs? > > > > We're started experimenting with such jobs for Druid and Elasticsearch. > > For Elasticsearch, the Samza job containers join the Elasticsearch > cluster > > as transport nodes and use the Java API to push ES data nodes. Likewise > > for Druid, the Samza job uses the Tranquility API to schedule jobs ( > > > > > https://github.com/metamx/tranquility/tree/master/src/main/scala/com/metamx/tranquility/samza > > ). > > > > The nice part about push versus pull is that the downstream system does > not > > need plugins (like ES rivers) that may complicate it's configuration or > > destabilize the system. > > > > Cheers, > > > > Roger > > > > On Tue, Mar 31, 2015 at 10:56 AM, Felix GV > <[email protected] > > > > > wrote: > > > > > Thanks for your reply Roger! Very insightful (: > > > > > > > 6. If there was a highly-optimized and reliable way of ingesting > > > > partitioned streams quickly into your online serving system, would > that > > > > help you leverage Samza more effectively? > > > > > > >> 6. Can you elaborate please? > > > > > > Sure. The feature set I have in mind is the following: > > > > > > * Provide a thinly-wrapped Kafka producer which does appropriate > > > partitioning and includes useful metadata (such as production > timestamp, > > > etc.) alongside the payload. This producer would be used in the last > step > > > of processing of a Samza topology, in order to emit to Kafka some > > > processed/joined/enriched data which is destined for online serving. > > > * Provide a consumer process which can be co-located on the same > > hosts > > > as your data serving system. This process consumes from the appropriate > > > partitions and checkpoints its offsets on its own. It leverages Kafka > > > batching and compression to make consumption very efficient. > > > * For each records the consumer process issues a put/insert locally > > to > > > the co-located serving process. Since this is a local operation, it is > > also > > > very cheap and efficient. > > > * The consumer process can also optionally throttle its insertion > > rate > > > by monitoring some performance metrics of the co-located data serving > > > process. For example, if the data serving process exposes a p99 latency > > via > > > JMX or other means, this can be used in a tight feedback loop to back > off > > > if read latency degrades beyond a certain threshold. > > > * This ingestion platform should be easy to integrate with any > > > consistently-routed data serving system, by implementing some simple > > > interfaces to let the ingestion system understand the key-to-partition > > > assignment strategy, as well as the partition-to-node assignment > > strategy. > > > Optionally, a hook to access performance metrics could also be > > implemented > > > if throttling is deemed important (as described in the previous point). > > > * Since the consumer process lives in a separate process, the > system > > > benefits from good isolation guarantees. The consumer process can be > > capped > > > to a low amount of heap, and its GC is inconsequential for the serving > > > platform. It's also possible to bounce the consumer and data serving > > > processes independently of each other, if need be. > > > > > > There are some more nuances and additional features which could be nice > > to > > > have, but that's the general idea. > > > > > > > > > It seems to me like such system would be valuable, but I'm wondering > what > > > other people in the open-source community think, hence why I was > > interested > > > in starting this thread... > > > > > > > > > Thanks for your feedback! > > > > > > -F > > > > > > -- Thanks and regards Chinmay Soman
