Thanks for the great explanation, Felix!

On Thu, Apr 2, 2015 at 4:08 PM, Felix GV <[email protected]>
wrote:

> Hi Roger,
>
> The slow storage shard situation is indeed a concern.
>
> The slow storage shard will back up your pusher process for all shards if
> the incoming Kafka stream partitions don't line up. Alternatively, your
> pusher process will keep going with the healthy shards but will then need
> to re-consume the whole input stream just to push to the unhealthy shard
> after it recovered. LinkedIn already has (at least) one or two such (push)
> systems and they definitely work, but they have some operational
> limitations, as discussed here.
>
> Now, you can make the pusher process' input topic partitions line up with
> the storage partitions, and that would alleviate the slow shard problem,
> but if you go through the trouble of partitioning your input stream this
> way, then you're not really far off from the pull model anyway.
> Furthermore, even with lined up partitioning, the push approach will still
> have the following caveats compared to pull:
>
>   1.  An extra hop. The hop is probably not a big deal in terms of
> latency, but it does double your bandwidth requirements (and more than
> double if the storage system does not support batch operations which are as
> efficient as Kafka's).
>   2.  Smart throttling. You can throttle with a push approach, but it is
> very crude. Likely the throttling would be a fixed QPS per topic or
> something like that. If you want to take into account the serving latency
> of the storage nodes, in order to back off when the storage node is under
> higher load, then it seems easier to achieve that accurately if you're
> co-located with the storage node, as you would be able to poll its
> performance metrics quicker and more often.
>   3.  Multi-tenancy. If you want many different topics getting fed into
> the storage system, but with different priorities for each, then it may
> also be easier to prioritize the various streams against one another in the
> pull model. If your push process consumes partitions that line up with the
> storage, then technically you can achieve the same thing, but then your
> push process deployment and config becomes a bit more complex. The pushers
> are not just any stateless auto-balanced consumer processes anymore, they
> need to be tied 1:1 or 1:M with the storage nodes. At this point, you
> already went about 80% of the way towards the pull model, so I would argue
> it makes things simpler to just go all the way.
>
> Of course, there's definitely many ways to skin this cat. And some of the
> concerns above shouldn't be someone's highest priority if they're just
> getting started playing around with Samza. But I think it would be nice to
> have an open-source system available which "does all the right things", so
> that even newcomers to Samza can have an easy way to ingest data into their
> serving system.
>
> My 2 cents.
>
> --
>
> Felix GV
> Data Infrastructure Engineer
> Distributed Data Systems
> LinkedIn
>
> [email protected]
> linkedin.com/in/felixgv
>
> ________________________________________
> From: Roger Hoover [[email protected]]
> Sent: Thursday, April 02, 2015 3:24 PM
> To: [email protected]
> Subject: Re: How do you serve the data computed by Samza?
>
> Is it because the Kafka partitioning might not be the same as the storage
> partitioning? So that a slow storage shard will prevent unrelated shards
> from getting their messages?
>
> Ah, I think I see what you mean. If so, then the solution is to make the
> Kafka partitioning match the storage partitioning. If that case, push or
> pull is the same, yeah?
>
> Thanks,
>
> Roger
>
> On Thu, Apr 2, 2015 at 3:21 PM, Roger Hoover <[email protected]>
> wrote:
>
> > Chinmay,
> >
> > Thanks for your input.
> >
> > I'm not understanding what the difference is. With the design that Felix
> > laid out, the co-located Kafka consumer is still doing a push to the
> > storage system, right?. It just happens to be on the same machine. How is
> > this different from pushing batches from a non-local Samza job? How does
> > the pull-based approach you're thinking of deal with feedback and SLAs?
> >
> > Thanks,
> >
> > Roger
> >
> >
> >
> > On Thu, Apr 2, 2015 at 2:54 PM, Chinmay Soman <[email protected]
> >
> > wrote:
> >
> >> My 2 cents => One thing to note about the push model : multi-tenancy
> >>
> >> When your storage system (Druid for example) is used in a multi-tenant
> >> fashion - then push model is a bit difficult to operate. Primarily
> because
> >> there is no real feedback loop from the storage system. Yes - if the
> >> storage system starts doing bad - then you get timeouts and higher
> >> latencies - but then you're already in a position where you're probably
> >> breaking SLAs (for some tenant).
> >>
> >> In that sense, a pull model might be better since the consumer can
> >> potentially have more visibility into how this particular node is doing.
> >> Also, with the Kafka consumer batches things up - so theoretically - you
> >> could get similar throughput. Downside of this approach is of course -
> the
> >> storage system partitioning scheme *has to* line up with the Kafka
> >> partitioning scheme.
> >>
> >> On Thu, Apr 2, 2015 at 11:41 AM, Roger Hoover <[email protected]>
> >> wrote:
> >>
> >> > Felix,
> >> >
> >> > I see your point about simple Kafka consumers. My thought was that if
> >> > you're already managing a Samza/YARN deployment then these types of
> jobs
> >> > would be "just another job" and not require an additional process
> >> > management/monitoring/operations setup. If you've already got a way to
> >> > handle vanilla Kafka jobs then it makes sense.
> >> >
> >> > For the push model, the way we're planning to deal with the latency of
> >> > round-trip calls is to batch up pushs to the downstream system. Both
> >> Druid
> >> > Tranquility and the ES transport node protocol allow you to batch
> index
> >> > requests. I'm curious if pull would be that much more efficient.
> >> >
> >> > Cheers,
> >> >
> >> > Roger
> >> >
> >> > On Wed, Apr 1, 2015 at 10:26 AM, Felix GV
> >> <[email protected]>
> >> > wrote:
> >> >
> >> > > Hi Roger,
> >> > >
> >> > > You bring up good points, and I think the short answer is that there
> >> are
> >> > > trade-offs to everything, of course (:
> >> > >
> >> > > What I described could definitely be implemented as a Samza job,
> and I
> >> > > think that would make a lot of sense if the data serving system was
> >> also
> >> > > deployed via YARN. This way, the Samza tasks responsible for
> ingesting
> >> > and
> >> > > populating the data serving system's nodes could be spawned wherever
> >> YARN
> >> > > knows these nodes are located. For data serving systems not well
> >> > integrated
> >> > > with YARN however, I'm not sure that there would be that much win in
> >> > using
> >> > > the Samza deployment model. And since the consumers themselves are
> >> pretty
> >> > > simple (no joining of streams, no local state, etc.), this seems to
> >> be a
> >> > > case where Samza is a bit overkill and a regular Kafka consumer is
> >> > > perfectly fine (except for the YARN-enabled auto-deployment aspect,
> >> like
> >> > I
> >> > > mentioned).
> >> > >
> >> > > As for push versus pull, I think the trade-off is the following:
> push
> >> is
> >> > > mostly simpler and more decoupled, as you said, but I think pull
> >> would be
> >> > > more efficient. The reason for that is that Kafka consumption is
> very
> >> > > efficient (thanks to batching and compression), but most data
> serving
> >> > > systems don't provide a streaming ingest API for pushing data
> >> efficiently
> >> > > to them, instead they have single record put/insert APIs which
> >> require a
> >> > > round-trip to be acknowledged. This is perfectly fine in
> >> low-throughput
> >> > > scenarios, but does not support very high throughput of ingestion
> like
> >> > > Kafka can provide. By co-locating the pulling process (i.e.: Kafka
> >> > > consumer) with the data serving node, it makes it a bit more
> >> affordable
> >> > to
> >> > > do single puts since the (local) round-trip acks would be
> >> > > near-instantaneous. Pulling also makes the tracking of offsets
> across
> >> > > different nodes a bit easier, since each node can consume at its own
> >> > pace,
> >> > > and resume at whatever point in the past it needs (i.e.: rewind)
> >> without
> >> > > affecting the other replicas. Tracking offsets across many replicas
> in
> >> > the
> >> > > push model is a bit more annoying, though still doable, of course.
> >> > >
> >> > > --
> >> > >
> >> > > Felix GV
> >> > > Data Infrastructure Engineer
> >> > > Distributed Data Systems
> >> > > LinkedIn
> >> > >
> >> > > [email protected]
> >> > > linkedin.com/in/felixgv
> >> > >
> >> > > ________________________________________
> >> > > From: Roger Hoover [[email protected]]
> >> > > Sent: Tuesday, March 31, 2015 8:57 PM
> >> > > To: [email protected]
> >> > > Subject: Re: How do you serve the data computed by Samza?
> >> > >
> >> > > Ah, thanks for the great explanation. Any particular reason that the
> >> > > job(s) you described should not be Samza jobs?
> >> > >
> >> > > We're started experimenting with such jobs for Druid and
> >> Elasticsearch.
> >> > > For Elasticsearch, the Samza job containers join the Elasticsearch
> >> > cluster
> >> > > as transport nodes and use the Java API to push ES data nodes.
> >> Likewise
> >> > > for Druid, the Samza job uses the Tranquility API to schedule jobs (
> >> > >
> >> > >
> >> >
> >>
> https://github.com/metamx/tranquility/tree/master/src/main/scala/com/metamx/tranquility/samza
> >> > > ).
> >> > >
> >> > > The nice part about push versus pull is that the downstream system
> >> does
> >> > not
> >> > > need plugins (like ES rivers) that may complicate it's configuration
> >> or
> >> > > destabilize the system.
> >> > >
> >> > > Cheers,
> >> > >
> >> > > Roger
> >> > >
> >> > > On Tue, Mar 31, 2015 at 10:56 AM, Felix GV
> >> > <[email protected]
> >> > > >
> >> > > wrote:
> >> > >
> >> > > > Thanks for your reply Roger! Very insightful (:
> >> > > >
> >> > > > > 6. If there was a highly-optimized and reliable way of ingesting
> >> > > > > partitioned streams quickly into your online serving system,
> would
> >> > that
> >> > > > > help you leverage Samza more effectively?
> >> > > >
> >> > > > >> 6. Can you elaborate please?
> >> > > >
> >> > > > Sure. The feature set I have in mind is the following:
> >> > > >
> >> > > > * Provide a thinly-wrapped Kafka producer which does appropriate
> >> > > > partitioning and includes useful metadata (such as production
> >> > timestamp,
> >> > > > etc.) alongside the payload. This producer would be used in the
> last
> >> > step
> >> > > > of processing of a Samza topology, in order to emit to Kafka some
> >> > > > processed/joined/enriched data which is destined for online
> serving.
> >> > > > * Provide a consumer process which can be co-located on the same
> >> > > hosts
> >> > > > as your data serving system. This process consumes from the
> >> appropriate
> >> > > > partitions and checkpoints its offsets on its own. It leverages
> >> Kafka
> >> > > > batching and compression to make consumption very efficient.
> >> > > > * For each records the consumer process issues a put/insert
> >> locally
> >> > > to
> >> > > > the co-located serving process. Since this is a local operation,
> it
> >> is
> >> > > also
> >> > > > very cheap and efficient.
> >> > > > * The consumer process can also optionally throttle its
> >> insertion
> >> > > rate
> >> > > > by monitoring some performance metrics of the co-located data
> >> serving
> >> > > > process. For example, if the data serving process exposes a p99
> >> latency
> >> > > via
> >> > > > JMX or other means, this can be used in a tight feedback loop to
> >> back
> >> > off
> >> > > > if read latency degrades beyond a certain threshold.
> >> > > > * This ingestion platform should be easy to integrate with any
> >> > > > consistently-routed data serving system, by implementing some
> simple
> >> > > > interfaces to let the ingestion system understand the
> >> key-to-partition
> >> > > > assignment strategy, as well as the partition-to-node assignment
> >> > > strategy.
> >> > > > Optionally, a hook to access performance metrics could also be
> >> > > implemented
> >> > > > if throttling is deemed important (as described in the previous
> >> point).
> >> > > > * Since the consumer process lives in a separate process, the
> >> > system
> >> > > > benefits from good isolation guarantees. The consumer process can
> be
> >> > > capped
> >> > > > to a low amount of heap, and its GC is inconsequential for the
> >> serving
> >> > > > platform. It's also possible to bounce the consumer and data
> serving
> >> > > > processes independently of each other, if need be.
> >> > > >
> >> > > > There are some more nuances and additional features which could be
> >> nice
> >> > > to
> >> > > > have, but that's the general idea.
> >> > > >
> >> > > >
> >> > > > It seems to me like such system would be valuable, but I'm
> wondering
> >> > what
> >> > > > other people in the open-source community think, hence why I was
> >> > > interested
> >> > > > in starting this thread...
> >> > > >
> >> > > >
> >> > > > Thanks for your feedback!
> >> > > >
> >> > > > -F
> >> > > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> Thanks and regards
> >>
> >> Chinmay Soman
> >>
> >
> >
>

Reply via email to