>> If you map a consumer with a partition, you will really suffer with
operational issues.

> I’m curious to hear a little more about this one. Are you simply saying
that decoupling partitions and consumers has a bunch of Ops benefits, or
are you saying that any time a consumer group with N consumers is reading
from a topic with N partitions that's asking for trouble?

Actually, what you're looking for is how much groups you will need. The
real dynamic aspect of consumption here is binding a subscriber to a
consumer group so that each subscriber will see the same update feed. It's
not a ideal design for multi tenancy though.

Partitions in the Kafka way to see them is not something very light or
logical since they define physical directories where files are written. You
can have hundreds of them, maybe few thousands, but they come with
drawbacks :
 - if you want a customer per partition, you need the producer to push the
data to all partitions so that all consumers will the the same
 - partitions number can be resized, but carefully since it implies
capacity planning and disk space availability given the data distribution.
 - resizing is a sensible operation, if you add partitions, just ensure
disk availability on brokers that will welcome new ones, but if you size
them down, you may encounter issues with ordering of your transactions.
 - partitions are filled by producers based on a partitionner. If you don't
achieve a good distribution among partitions, you may encounter situations
where a disk is full when others are empty.

To me partitions are an internal aspect to Kafka to make it scale
horizontaly, and it can be related to the consumper scalability you're
looking for. Also a big difference is how you may want to expose an update
feed (subscribing from now, or catching up to a previous state).

For many of the above aspects, even if Kafka remains a great piece of art,
Pulsar is doing a better job as a distributed log. Based on Bookkeeper,
data are split into segments that are more fine grained than Partitions and
easier to manage/move etc. Still they're not more usefull for consumers and
Pulsar have Subscriptions where Kafka have Consumer Groups. Consumers are
just a way to scale the consumption ( read more here
http://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#subscription-modes
)

I hope it helps :)


On Mon, Mar 25, 2019 at 7:41 PM Adam Kocoloski <kocol...@apache.org> wrote:

> Hi Steven, yep, thanks for adding that. When I wrote this I was
> double-checking the behavior of consumer groups to see if it was ever
> useful to have more consumers in a consumer group than partitions in the
> associated topic. Seems like the answer is no — partitions need to be
> allocated with an eye towards consumer throughput, not just producer
> throughput.
>
> > If you map a consumer with a partition, you will really suffer with
> operational issues.
>
> I’m curious to hear a little more about this one. Are you simply saying
> that decoupling partitions and consumers has a bunch of Ops benefits, or
> are you saying that any time a consumer group with N consumers is reading
> from a topic with N partitions that's asking for trouble?
>
> Pulsar / Kafka as a tool for change feed storage is a really interesting
> conversation. I’ve previously taken a close look at both of them for that
> use case and I do think there’s a lot of potential. I know Pulsar in
> particular was designed to solve that sort of problem at scale. I had been
> presuming that this would be a “both/and” kind of thing; i.e., every
> CouchDB would still have a built-in _changes feed, but via Pulsar I/O or
> Kafka Connect you could easily wire that feed into the message bus for
> improved distribution, a broader ecosystem, better management, etc. When
> you start talking about replacing the internal feed altogether there are a
> number of complications:
>
> - Transactional updates across both FoundationDB and Pulsar/Kafka is a
> hard problem
> - Documents showing up "exactly once” is possible with topic compaction,
> but not transactional
> - Internal consumers of the change feed, e.g. the secondary view system
> would get more complicated
>
> I like that I’m not the only one thinking these things go hand-in-hand,
> though :)
>
> Adam
>
> > On Mar 25, 2019, at 1:06 PM, Steven Le Roux <leroux.ste...@gmail.com>
> wrote:
> >
> > Hi Adam,
> >
> > Regarding the consumer scaling point, I just want to mention that
> > Partitions are used in Kafka (or Pulsar) to scale a topic but not
> directly
> > consumers. If you map a consumer with a partition, you will really suffer
> > with operational issues.
> >
> > Consumers are managed in "consumer groups" that isolate them which is a
> > handy way to choose between a shared message queue to a pub/sub model. If
> > you need consumers to read the same data, then you need them to have a
> > different consumer group id. They're used to track distribution accross
> > consumers by assigning partitions to consumers. Only one consumer in a
> > consumer group can subscribe a given partition.
> >
> > While going distributed, I'd find the choice of relying on Pulsar or
> > Bookkeeper or Kafka as the distributed Log very interesting.
> >
> > I would find very smart to provide two version of CouchDB :
> > - a classic standalone/master/slave
> > - a distributed with FoundationDB layer for KV and a distributed Log
> > option for the feeds/changes.
> >
> >
> > On Wed, Mar 20, 2019 at 11:47 PM Adam Kocoloski <kocol...@apache.org>
> wrote:
> >
> >> Hi all,
> >>
> >> Most of the discussions so far have focused on the core features that
> are
> >> fundamental to CouchDB: JSON documents, revision tracking, _changes. I
> >> thought I’d start a thread on something a bit different: the _db_updates
> >> feed.
> >>
> >> The _db_updates feed is an API that enables users to discover database
> >> lifecycle events across an entire CouchDB instance. It’s primarily
> useful
> >> in deployments that have lots and lots of databases, where it’s
> impractical
> >> to keep connections open for every database, and where database
> creations
> >> and deletions may be an automated aspect of the application’s use of
> >> CouchDB.
> >>
> >> There are really two topics for discussion here. The first is: do we
> need
> >> to keep it? The primary driver of applications creating lots of DBs is
> the
> >> per-DB granularity of access controls; if we go down the route of
> >> implementing the document-level _access proposal perhaps users naturally
> >> migrate away from this DB-per-user data model. I’d be curious to hear
> >> points of view there.
> >>
> >> I’ll assume for now that we do want to keep it, and offer some thoughts
> on
> >> how to implement it. The main challenge with _db_updates is managing the
> >> write contention; in write-heavy databases you have a lot of producers
> >> trying to tag that particular database as “updated", but all the
> consumer
> >> really cares about is getting a single “dbname”:”updated” event as
> needed.
> >> In the current architecture we try to dedupe a lot of the events
> in-memory
> >> before updating a regular CouchDB database with this information, but
> this
> >> leaves us exposed to possibly dropping events within a few second
> window.
> >>
> >> ## Option 1: Queue + Compaction
> >>
> >> One way to tackle this in FoundationDB is to have an intermediate
> subspace
> >> reserved as a queue. Each transaction that modifies a database would
> insert
> >> a versionstamped KV into the queue like
> >>
> >> Versionstamp = (DbName, EventType)
> >>
> >> Versionstamps are monotonically increasing and inserting versionstamped
> >> keys is a conflict-free operation. We’d have a consumer of this queue
> which
> >> is responsible for “log compaction”; i.e., the consumer would do range
> >> reads on the queue subspace, toss out duplicate contiguous
> >> “dbname”:“updated” events, and update a second index which would look
> more
> >> like the _changes feed.
> >>
> >> ### Scaling Consumers
> >>
> >> A single consumer can likely process 10k events/sec or more, but
> >> eventually we’ll need to scale. Borrowing from systems like Kafka the
> >> typical way to do this is to divide the queue into partitions and have
> >> individual consumers mapped to each partition. A partition in this model
> >> would just be a prefix on the Versionstamp:
> >>
> >> (PartitionID, Versionstamp) = (DbName, EventType)
> >>
> >> Our consumers will be more efficient and less likely to conflict with
> one
> >> another on updating the _db_updates index if messages are keyed to a
> >> partition based on DbName, although this still runs the risk that a
> couple
> >> of high-throughput databases could swamp a partition.
> >>
> >> I’m not sure about the best path forward for handling that scenario. One
> >> could implement a rate-limiter that starts sloughing off additional
> >> messages for high-throughput databases (which has some careful edge
> cases),
> >> split the messages for a single database across multiple partitions,
> rely
> >> on operators to blacklist certain databases from the _db_updates system,
> >> etc. Each has downsides.
> >>
> >> ## Option 2: Atomic Ops + Consumer
> >>
> >> In this approach we still have an intermediate subspace, and a consumer
> of
> >> that subspace which updates the _db_updates index. But this time, we
> have
> >> at most one KV per database in the subspace, with an atomic counter for
> a
> >> value. When a document is updated it bumps the counter for its database
> in
> >> that subspace. So we’ll have entries like
> >>
> >> (“counters”, “db1235”) = 1
> >> (“counters”, “db0001”) = 42
> >> (“counters”, “telemetry-db”) = 12312
> >>
> >> and so on. Like versionstamps, atomic operations are conflict-free so we
> >> need not worry about introducing spurious conflicts on high-throughput
> >> databases.
> >>
> >> The initial pass of the consumer logic would go something like this:
> >>
> >> - Do a snapshot range read of the “counters” subspace (or whatever we
> call
> >> it)
> >> - Record the current values for all counters in a separate summary KV
> >> (you’ll see why in a minute)
> >> - Do a limit=1 range read on the _changes space for each DB in the list
> to
> >> grab the latest Sequence
> >> - Update the _db_updates index with the latest Sequence for each of
> these
> >> databases
> >>
> >> On a second pass, the consumer would read the summary KV from the last
> >> pass and compare the previous counters with the current values. If any
> >> counters have not been updated in the interval, the consumer would try
> to
> >> clear those from the “counters” subspace (adding them as explicit
> conflict
> >> keys to ensure we don’t miss a concurrent update). It would then proceed
> >> with the rest of the logic from the initial pass. This is a careful
> >> balancing act:
> >>
> >> - We don’t want to pollute the “counters” subspace with idle databases
> >> because each entry requires an extra read of _changes
> >> - We don’t want to attempt to clear counters that are constantly updated
> >> because that’s going to fail with a conflict every time
> >>
> >> The scalability axis here is the number of databases updated within any
> >> short window of time (~1 second or less). If we end up with that number
> >> growing large we can have consumers responsible for range of the
> “counters”
> >> subspace, though I think that’s less likely than in the queue-based
> design.
> >>
> >> I don’t know in detail what optimizations FoundationDB applies to atomic
> >> operations (e.g. coalescing them at a layer above the storage engine).
> >> That’s worth checking into, as otherwise I’d be concerned about
> introducing
> >> super-hot keys here.
> >>
> >> This option does not handle the “created” and “deleted” lifecycle events
> >> for each database, but those are really quite simple and could really be
> >> inserted directly into the _db_updates index.
> >>
> >> ===
> >>
> >> There are some additional details which can be fleshed out in an RFC,
> but
> >> this is the basic gist of things. Both designs would be more robust at
> >> capturing every single updated database (because the enqueue/increment
> >> operation would be part of the document update transaction). They would
> >> allow for a small delay between the document update and the appearance
> of
> >> the database in _db_updates, which is no different than we have today.
> They
> >> each require a background process.
> >>
> >> Let’s hear what you think, both about the interest level for this
> feature
> >> and any comments on the designs. I may take this one over to the FDB
> forums
> >> as well for feedback. Cheers,
> >>
> >> Adam
>
>

Reply via email to