Hi Adam, Regarding the consumer scaling point, I just want to mention that Partitions are used in Kafka (or Pulsar) to scale a topic but not directly consumers. If you map a consumer with a partition, you will really suffer with operational issues.
Consumers are managed in "consumer groups" that isolate them which is a handy way to choose between a shared message queue to a pub/sub model. If you need consumers to read the same data, then you need them to have a different consumer group id. They're used to track distribution accross consumers by assigning partitions to consumers. Only one consumer in a consumer group can subscribe a given partition. While going distributed, I'd find the choice of relying on Pulsar or Bookkeeper or Kafka as the distributed Log very interesting. I would find very smart to provide two version of CouchDB : - a classic standalone/master/slave - a distributed with FoundationDB layer for KV and a distributed Log option for the feeds/changes. On Wed, Mar 20, 2019 at 11:47 PM Adam Kocoloski <kocol...@apache.org> wrote: > Hi all, > > Most of the discussions so far have focused on the core features that are > fundamental to CouchDB: JSON documents, revision tracking, _changes. I > thought I’d start a thread on something a bit different: the _db_updates > feed. > > The _db_updates feed is an API that enables users to discover database > lifecycle events across an entire CouchDB instance. It’s primarily useful > in deployments that have lots and lots of databases, where it’s impractical > to keep connections open for every database, and where database creations > and deletions may be an automated aspect of the application’s use of > CouchDB. > > There are really two topics for discussion here. The first is: do we need > to keep it? The primary driver of applications creating lots of DBs is the > per-DB granularity of access controls; if we go down the route of > implementing the document-level _access proposal perhaps users naturally > migrate away from this DB-per-user data model. I’d be curious to hear > points of view there. > > I’ll assume for now that we do want to keep it, and offer some thoughts on > how to implement it. The main challenge with _db_updates is managing the > write contention; in write-heavy databases you have a lot of producers > trying to tag that particular database as “updated", but all the consumer > really cares about is getting a single “dbname”:”updated” event as needed. > In the current architecture we try to dedupe a lot of the events in-memory > before updating a regular CouchDB database with this information, but this > leaves us exposed to possibly dropping events within a few second window. > > ## Option 1: Queue + Compaction > > One way to tackle this in FoundationDB is to have an intermediate subspace > reserved as a queue. Each transaction that modifies a database would insert > a versionstamped KV into the queue like > > Versionstamp = (DbName, EventType) > > Versionstamps are monotonically increasing and inserting versionstamped > keys is a conflict-free operation. We’d have a consumer of this queue which > is responsible for “log compaction”; i.e., the consumer would do range > reads on the queue subspace, toss out duplicate contiguous > “dbname”:“updated” events, and update a second index which would look more > like the _changes feed. > > ### Scaling Consumers > > A single consumer can likely process 10k events/sec or more, but > eventually we’ll need to scale. Borrowing from systems like Kafka the > typical way to do this is to divide the queue into partitions and have > individual consumers mapped to each partition. A partition in this model > would just be a prefix on the Versionstamp: > > (PartitionID, Versionstamp) = (DbName, EventType) > > Our consumers will be more efficient and less likely to conflict with one > another on updating the _db_updates index if messages are keyed to a > partition based on DbName, although this still runs the risk that a couple > of high-throughput databases could swamp a partition. > > I’m not sure about the best path forward for handling that scenario. One > could implement a rate-limiter that starts sloughing off additional > messages for high-throughput databases (which has some careful edge cases), > split the messages for a single database across multiple partitions, rely > on operators to blacklist certain databases from the _db_updates system, > etc. Each has downsides. > > ## Option 2: Atomic Ops + Consumer > > In this approach we still have an intermediate subspace, and a consumer of > that subspace which updates the _db_updates index. But this time, we have > at most one KV per database in the subspace, with an atomic counter for a > value. When a document is updated it bumps the counter for its database in > that subspace. So we’ll have entries like > > (“counters”, “db1235”) = 1 > (“counters”, “db0001”) = 42 > (“counters”, “telemetry-db”) = 12312 > > and so on. Like versionstamps, atomic operations are conflict-free so we > need not worry about introducing spurious conflicts on high-throughput > databases. > > The initial pass of the consumer logic would go something like this: > > - Do a snapshot range read of the “counters” subspace (or whatever we call > it) > - Record the current values for all counters in a separate summary KV > (you’ll see why in a minute) > - Do a limit=1 range read on the _changes space for each DB in the list to > grab the latest Sequence > - Update the _db_updates index with the latest Sequence for each of these > databases > > On a second pass, the consumer would read the summary KV from the last > pass and compare the previous counters with the current values. If any > counters have not been updated in the interval, the consumer would try to > clear those from the “counters” subspace (adding them as explicit conflict > keys to ensure we don’t miss a concurrent update). It would then proceed > with the rest of the logic from the initial pass. This is a careful > balancing act: > > - We don’t want to pollute the “counters” subspace with idle databases > because each entry requires an extra read of _changes > - We don’t want to attempt to clear counters that are constantly updated > because that’s going to fail with a conflict every time > > The scalability axis here is the number of databases updated within any > short window of time (~1 second or less). If we end up with that number > growing large we can have consumers responsible for range of the “counters” > subspace, though I think that’s less likely than in the queue-based design. > > I don’t know in detail what optimizations FoundationDB applies to atomic > operations (e.g. coalescing them at a layer above the storage engine). > That’s worth checking into, as otherwise I’d be concerned about introducing > super-hot keys here. > > This option does not handle the “created” and “deleted” lifecycle events > for each database, but those are really quite simple and could really be > inserted directly into the _db_updates index. > > === > > There are some additional details which can be fleshed out in an RFC, but > this is the basic gist of things. Both designs would be more robust at > capturing every single updated database (because the enqueue/increment > operation would be part of the document update transaction). They would > allow for a small delay between the document update and the appearance of > the database in _db_updates, which is no different than we have today. They > each require a background process. > > Let’s hear what you think, both about the interest level for this feature > and any comments on the designs. I may take this one over to the FDB forums > as well for feedback. Cheers, > > Adam