Thanks for the great explanation, Felix! On Thu, Apr 2, 2015 at 4:08 PM, Felix GV <[email protected]> wrote:
> Hi Roger, > > The slow storage shard situation is indeed a concern. > > The slow storage shard will back up your pusher process for all shards if > the incoming Kafka stream partitions don't line up. Alternatively, your > pusher process will keep going with the healthy shards but will then need > to re-consume the whole input stream just to push to the unhealthy shard > after it recovered. LinkedIn already has (at least) one or two such (push) > systems and they definitely work, but they have some operational > limitations, as discussed here. > > Now, you can make the pusher process' input topic partitions line up with > the storage partitions, and that would alleviate the slow shard problem, > but if you go through the trouble of partitioning your input stream this > way, then you're not really far off from the pull model anyway. > Furthermore, even with lined up partitioning, the push approach will still > have the following caveats compared to pull: > > 1. An extra hop. The hop is probably not a big deal in terms of > latency, but it does double your bandwidth requirements (and more than > double if the storage system does not support batch operations which are as > efficient as Kafka's). > 2. Smart throttling. You can throttle with a push approach, but it is > very crude. Likely the throttling would be a fixed QPS per topic or > something like that. If you want to take into account the serving latency > of the storage nodes, in order to back off when the storage node is under > higher load, then it seems easier to achieve that accurately if you're > co-located with the storage node, as you would be able to poll its > performance metrics quicker and more often. > 3. Multi-tenancy. If you want many different topics getting fed into > the storage system, but with different priorities for each, then it may > also be easier to prioritize the various streams against one another in the > pull model. If your push process consumes partitions that line up with the > storage, then technically you can achieve the same thing, but then your > push process deployment and config becomes a bit more complex. The pushers > are not just any stateless auto-balanced consumer processes anymore, they > need to be tied 1:1 or 1:M with the storage nodes. At this point, you > already went about 80% of the way towards the pull model, so I would argue > it makes things simpler to just go all the way. > > Of course, there's definitely many ways to skin this cat. And some of the > concerns above shouldn't be someone's highest priority if they're just > getting started playing around with Samza. But I think it would be nice to > have an open-source system available which "does all the right things", so > that even newcomers to Samza can have an easy way to ingest data into their > serving system. > > My 2 cents. > > -- > > Felix GV > Data Infrastructure Engineer > Distributed Data Systems > LinkedIn > > [email protected] > linkedin.com/in/felixgv > > ________________________________________ > From: Roger Hoover [[email protected]] > Sent: Thursday, April 02, 2015 3:24 PM > To: [email protected] > Subject: Re: How do you serve the data computed by Samza? > > Is it because the Kafka partitioning might not be the same as the storage > partitioning? So that a slow storage shard will prevent unrelated shards > from getting their messages? > > Ah, I think I see what you mean. If so, then the solution is to make the > Kafka partitioning match the storage partitioning. If that case, push or > pull is the same, yeah? > > Thanks, > > Roger > > On Thu, Apr 2, 2015 at 3:21 PM, Roger Hoover <[email protected]> > wrote: > > > Chinmay, > > > > Thanks for your input. > > > > I'm not understanding what the difference is. With the design that Felix > > laid out, the co-located Kafka consumer is still doing a push to the > > storage system, right?. It just happens to be on the same machine. How is > > this different from pushing batches from a non-local Samza job? How does > > the pull-based approach you're thinking of deal with feedback and SLAs? > > > > Thanks, > > > > Roger > > > > > > > > On Thu, Apr 2, 2015 at 2:54 PM, Chinmay Soman <[email protected] > > > > wrote: > > > >> My 2 cents => One thing to note about the push model : multi-tenancy > >> > >> When your storage system (Druid for example) is used in a multi-tenant > >> fashion - then push model is a bit difficult to operate. Primarily > because > >> there is no real feedback loop from the storage system. Yes - if the > >> storage system starts doing bad - then you get timeouts and higher > >> latencies - but then you're already in a position where you're probably > >> breaking SLAs (for some tenant). > >> > >> In that sense, a pull model might be better since the consumer can > >> potentially have more visibility into how this particular node is doing. > >> Also, with the Kafka consumer batches things up - so theoretically - you > >> could get similar throughput. Downside of this approach is of course - > the > >> storage system partitioning scheme *has to* line up with the Kafka > >> partitioning scheme. > >> > >> On Thu, Apr 2, 2015 at 11:41 AM, Roger Hoover <[email protected]> > >> wrote: > >> > >> > Felix, > >> > > >> > I see your point about simple Kafka consumers. My thought was that if > >> > you're already managing a Samza/YARN deployment then these types of > jobs > >> > would be "just another job" and not require an additional process > >> > management/monitoring/operations setup. If you've already got a way to > >> > handle vanilla Kafka jobs then it makes sense. > >> > > >> > For the push model, the way we're planning to deal with the latency of > >> > round-trip calls is to batch up pushs to the downstream system. Both > >> Druid > >> > Tranquility and the ES transport node protocol allow you to batch > index > >> > requests. I'm curious if pull would be that much more efficient. > >> > > >> > Cheers, > >> > > >> > Roger > >> > > >> > On Wed, Apr 1, 2015 at 10:26 AM, Felix GV > >> <[email protected]> > >> > wrote: > >> > > >> > > Hi Roger, > >> > > > >> > > You bring up good points, and I think the short answer is that there > >> are > >> > > trade-offs to everything, of course (: > >> > > > >> > > What I described could definitely be implemented as a Samza job, > and I > >> > > think that would make a lot of sense if the data serving system was > >> also > >> > > deployed via YARN. This way, the Samza tasks responsible for > ingesting > >> > and > >> > > populating the data serving system's nodes could be spawned wherever > >> YARN > >> > > knows these nodes are located. For data serving systems not well > >> > integrated > >> > > with YARN however, I'm not sure that there would be that much win in > >> > using > >> > > the Samza deployment model. And since the consumers themselves are > >> pretty > >> > > simple (no joining of streams, no local state, etc.), this seems to > >> be a > >> > > case where Samza is a bit overkill and a regular Kafka consumer is > >> > > perfectly fine (except for the YARN-enabled auto-deployment aspect, > >> like > >> > I > >> > > mentioned). > >> > > > >> > > As for push versus pull, I think the trade-off is the following: > push > >> is > >> > > mostly simpler and more decoupled, as you said, but I think pull > >> would be > >> > > more efficient. The reason for that is that Kafka consumption is > very > >> > > efficient (thanks to batching and compression), but most data > serving > >> > > systems don't provide a streaming ingest API for pushing data > >> efficiently > >> > > to them, instead they have single record put/insert APIs which > >> require a > >> > > round-trip to be acknowledged. This is perfectly fine in > >> low-throughput > >> > > scenarios, but does not support very high throughput of ingestion > like > >> > > Kafka can provide. By co-locating the pulling process (i.e.: Kafka > >> > > consumer) with the data serving node, it makes it a bit more > >> affordable > >> > to > >> > > do single puts since the (local) round-trip acks would be > >> > > near-instantaneous. Pulling also makes the tracking of offsets > across > >> > > different nodes a bit easier, since each node can consume at its own > >> > pace, > >> > > and resume at whatever point in the past it needs (i.e.: rewind) > >> without > >> > > affecting the other replicas. Tracking offsets across many replicas > in > >> > the > >> > > push model is a bit more annoying, though still doable, of course. > >> > > > >> > > -- > >> > > > >> > > Felix GV > >> > > Data Infrastructure Engineer > >> > > Distributed Data Systems > >> > > LinkedIn > >> > > > >> > > [email protected] > >> > > linkedin.com/in/felixgv > >> > > > >> > > ________________________________________ > >> > > From: Roger Hoover [[email protected]] > >> > > Sent: Tuesday, March 31, 2015 8:57 PM > >> > > To: [email protected] > >> > > Subject: Re: How do you serve the data computed by Samza? > >> > > > >> > > Ah, thanks for the great explanation. Any particular reason that the > >> > > job(s) you described should not be Samza jobs? > >> > > > >> > > We're started experimenting with such jobs for Druid and > >> Elasticsearch. > >> > > For Elasticsearch, the Samza job containers join the Elasticsearch > >> > cluster > >> > > as transport nodes and use the Java API to push ES data nodes. > >> Likewise > >> > > for Druid, the Samza job uses the Tranquility API to schedule jobs ( > >> > > > >> > > > >> > > >> > https://github.com/metamx/tranquility/tree/master/src/main/scala/com/metamx/tranquility/samza > >> > > ). > >> > > > >> > > The nice part about push versus pull is that the downstream system > >> does > >> > not > >> > > need plugins (like ES rivers) that may complicate it's configuration > >> or > >> > > destabilize the system. > >> > > > >> > > Cheers, > >> > > > >> > > Roger > >> > > > >> > > On Tue, Mar 31, 2015 at 10:56 AM, Felix GV > >> > <[email protected] > >> > > > > >> > > wrote: > >> > > > >> > > > Thanks for your reply Roger! Very insightful (: > >> > > > > >> > > > > 6. If there was a highly-optimized and reliable way of ingesting > >> > > > > partitioned streams quickly into your online serving system, > would > >> > that > >> > > > > help you leverage Samza more effectively? > >> > > > > >> > > > >> 6. Can you elaborate please? > >> > > > > >> > > > Sure. The feature set I have in mind is the following: > >> > > > > >> > > > * Provide a thinly-wrapped Kafka producer which does appropriate > >> > > > partitioning and includes useful metadata (such as production > >> > timestamp, > >> > > > etc.) alongside the payload. This producer would be used in the > last > >> > step > >> > > > of processing of a Samza topology, in order to emit to Kafka some > >> > > > processed/joined/enriched data which is destined for online > serving. > >> > > > * Provide a consumer process which can be co-located on the same > >> > > hosts > >> > > > as your data serving system. This process consumes from the > >> appropriate > >> > > > partitions and checkpoints its offsets on its own. It leverages > >> Kafka > >> > > > batching and compression to make consumption very efficient. > >> > > > * For each records the consumer process issues a put/insert > >> locally > >> > > to > >> > > > the co-located serving process. Since this is a local operation, > it > >> is > >> > > also > >> > > > very cheap and efficient. > >> > > > * The consumer process can also optionally throttle its > >> insertion > >> > > rate > >> > > > by monitoring some performance metrics of the co-located data > >> serving > >> > > > process. For example, if the data serving process exposes a p99 > >> latency > >> > > via > >> > > > JMX or other means, this can be used in a tight feedback loop to > >> back > >> > off > >> > > > if read latency degrades beyond a certain threshold. > >> > > > * This ingestion platform should be easy to integrate with any > >> > > > consistently-routed data serving system, by implementing some > simple > >> > > > interfaces to let the ingestion system understand the > >> key-to-partition > >> > > > assignment strategy, as well as the partition-to-node assignment > >> > > strategy. > >> > > > Optionally, a hook to access performance metrics could also be > >> > > implemented > >> > > > if throttling is deemed important (as described in the previous > >> point). > >> > > > * Since the consumer process lives in a separate process, the > >> > system > >> > > > benefits from good isolation guarantees. The consumer process can > be > >> > > capped > >> > > > to a low amount of heap, and its GC is inconsequential for the > >> serving > >> > > > platform. It's also possible to bounce the consumer and data > serving > >> > > > processes independently of each other, if need be. > >> > > > > >> > > > There are some more nuances and additional features which could be > >> nice > >> > > to > >> > > > have, but that's the general idea. > >> > > > > >> > > > > >> > > > It seems to me like such system would be valuable, but I'm > wondering > >> > what > >> > > > other people in the open-source community think, hence why I was > >> > > interested > >> > > > in starting this thread... > >> > > > > >> > > > > >> > > > Thanks for your feedback! > >> > > > > >> > > > -F > >> > > > > >> > > > >> > > >> > >> > >> > >> -- > >> Thanks and regards > >> > >> Chinmay Soman > >> > > > > >
