Hi Steven, yep, thanks for adding that. When I wrote this I was double-checking the behavior of consumer groups to see if it was ever useful to have more consumers in a consumer group than partitions in the associated topic. Seems like the answer is no — partitions need to be allocated with an eye towards consumer throughput, not just producer throughput.
> If you map a consumer with a partition, you will really suffer with > operational issues. I’m curious to hear a little more about this one. Are you simply saying that decoupling partitions and consumers has a bunch of Ops benefits, or are you saying that any time a consumer group with N consumers is reading from a topic with N partitions that's asking for trouble? Pulsar / Kafka as a tool for change feed storage is a really interesting conversation. I’ve previously taken a close look at both of them for that use case and I do think there’s a lot of potential. I know Pulsar in particular was designed to solve that sort of problem at scale. I had been presuming that this would be a “both/and” kind of thing; i.e., every CouchDB would still have a built-in _changes feed, but via Pulsar I/O or Kafka Connect you could easily wire that feed into the message bus for improved distribution, a broader ecosystem, better management, etc. When you start talking about replacing the internal feed altogether there are a number of complications: - Transactional updates across both FoundationDB and Pulsar/Kafka is a hard problem - Documents showing up "exactly once” is possible with topic compaction, but not transactional - Internal consumers of the change feed, e.g. the secondary view system would get more complicated I like that I’m not the only one thinking these things go hand-in-hand, though :) Adam > On Mar 25, 2019, at 1:06 PM, Steven Le Roux <leroux.ste...@gmail.com> wrote: > > Hi Adam, > > Regarding the consumer scaling point, I just want to mention that > Partitions are used in Kafka (or Pulsar) to scale a topic but not directly > consumers. If you map a consumer with a partition, you will really suffer > with operational issues. > > Consumers are managed in "consumer groups" that isolate them which is a > handy way to choose between a shared message queue to a pub/sub model. If > you need consumers to read the same data, then you need them to have a > different consumer group id. They're used to track distribution accross > consumers by assigning partitions to consumers. Only one consumer in a > consumer group can subscribe a given partition. > > While going distributed, I'd find the choice of relying on Pulsar or > Bookkeeper or Kafka as the distributed Log very interesting. > > I would find very smart to provide two version of CouchDB : > - a classic standalone/master/slave > - a distributed with FoundationDB layer for KV and a distributed Log > option for the feeds/changes. > > > On Wed, Mar 20, 2019 at 11:47 PM Adam Kocoloski <kocol...@apache.org> wrote: > >> Hi all, >> >> Most of the discussions so far have focused on the core features that are >> fundamental to CouchDB: JSON documents, revision tracking, _changes. I >> thought I’d start a thread on something a bit different: the _db_updates >> feed. >> >> The _db_updates feed is an API that enables users to discover database >> lifecycle events across an entire CouchDB instance. It’s primarily useful >> in deployments that have lots and lots of databases, where it’s impractical >> to keep connections open for every database, and where database creations >> and deletions may be an automated aspect of the application’s use of >> CouchDB. >> >> There are really two topics for discussion here. The first is: do we need >> to keep it? The primary driver of applications creating lots of DBs is the >> per-DB granularity of access controls; if we go down the route of >> implementing the document-level _access proposal perhaps users naturally >> migrate away from this DB-per-user data model. I’d be curious to hear >> points of view there. >> >> I’ll assume for now that we do want to keep it, and offer some thoughts on >> how to implement it. The main challenge with _db_updates is managing the >> write contention; in write-heavy databases you have a lot of producers >> trying to tag that particular database as “updated", but all the consumer >> really cares about is getting a single “dbname”:”updated” event as needed. >> In the current architecture we try to dedupe a lot of the events in-memory >> before updating a regular CouchDB database with this information, but this >> leaves us exposed to possibly dropping events within a few second window. >> >> ## Option 1: Queue + Compaction >> >> One way to tackle this in FoundationDB is to have an intermediate subspace >> reserved as a queue. Each transaction that modifies a database would insert >> a versionstamped KV into the queue like >> >> Versionstamp = (DbName, EventType) >> >> Versionstamps are monotonically increasing and inserting versionstamped >> keys is a conflict-free operation. We’d have a consumer of this queue which >> is responsible for “log compaction”; i.e., the consumer would do range >> reads on the queue subspace, toss out duplicate contiguous >> “dbname”:“updated” events, and update a second index which would look more >> like the _changes feed. >> >> ### Scaling Consumers >> >> A single consumer can likely process 10k events/sec or more, but >> eventually we’ll need to scale. Borrowing from systems like Kafka the >> typical way to do this is to divide the queue into partitions and have >> individual consumers mapped to each partition. A partition in this model >> would just be a prefix on the Versionstamp: >> >> (PartitionID, Versionstamp) = (DbName, EventType) >> >> Our consumers will be more efficient and less likely to conflict with one >> another on updating the _db_updates index if messages are keyed to a >> partition based on DbName, although this still runs the risk that a couple >> of high-throughput databases could swamp a partition. >> >> I’m not sure about the best path forward for handling that scenario. One >> could implement a rate-limiter that starts sloughing off additional >> messages for high-throughput databases (which has some careful edge cases), >> split the messages for a single database across multiple partitions, rely >> on operators to blacklist certain databases from the _db_updates system, >> etc. Each has downsides. >> >> ## Option 2: Atomic Ops + Consumer >> >> In this approach we still have an intermediate subspace, and a consumer of >> that subspace which updates the _db_updates index. But this time, we have >> at most one KV per database in the subspace, with an atomic counter for a >> value. When a document is updated it bumps the counter for its database in >> that subspace. So we’ll have entries like >> >> (“counters”, “db1235”) = 1 >> (“counters”, “db0001”) = 42 >> (“counters”, “telemetry-db”) = 12312 >> >> and so on. Like versionstamps, atomic operations are conflict-free so we >> need not worry about introducing spurious conflicts on high-throughput >> databases. >> >> The initial pass of the consumer logic would go something like this: >> >> - Do a snapshot range read of the “counters” subspace (or whatever we call >> it) >> - Record the current values for all counters in a separate summary KV >> (you’ll see why in a minute) >> - Do a limit=1 range read on the _changes space for each DB in the list to >> grab the latest Sequence >> - Update the _db_updates index with the latest Sequence for each of these >> databases >> >> On a second pass, the consumer would read the summary KV from the last >> pass and compare the previous counters with the current values. If any >> counters have not been updated in the interval, the consumer would try to >> clear those from the “counters” subspace (adding them as explicit conflict >> keys to ensure we don’t miss a concurrent update). It would then proceed >> with the rest of the logic from the initial pass. This is a careful >> balancing act: >> >> - We don’t want to pollute the “counters” subspace with idle databases >> because each entry requires an extra read of _changes >> - We don’t want to attempt to clear counters that are constantly updated >> because that’s going to fail with a conflict every time >> >> The scalability axis here is the number of databases updated within any >> short window of time (~1 second or less). If we end up with that number >> growing large we can have consumers responsible for range of the “counters” >> subspace, though I think that’s less likely than in the queue-based design. >> >> I don’t know in detail what optimizations FoundationDB applies to atomic >> operations (e.g. coalescing them at a layer above the storage engine). >> That’s worth checking into, as otherwise I’d be concerned about introducing >> super-hot keys here. >> >> This option does not handle the “created” and “deleted” lifecycle events >> for each database, but those are really quite simple and could really be >> inserted directly into the _db_updates index. >> >> === >> >> There are some additional details which can be fleshed out in an RFC, but >> this is the basic gist of things. Both designs would be more robust at >> capturing every single updated database (because the enqueue/increment >> operation would be part of the document update transaction). They would >> allow for a small delay between the document update and the appearance of >> the database in _db_updates, which is no different than we have today. They >> each require a background process. >> >> Let’s hear what you think, both about the interest level for this feature >> and any comments on the designs. I may take this one over to the FDB forums >> as well for feedback. Cheers, >> >> Adam