Hi Adam,

Regarding the consumer scaling point, I just want to mention that
Partitions are used in Kafka (or Pulsar) to scale a topic but not directly
consumers. If you map a consumer with a partition, you will really suffer
with operational issues.

Consumers are managed in "consumer groups" that isolate them which is a
handy way to choose between a shared message queue to a pub/sub model. If
you need consumers to read the same data, then you need them to have a
different consumer group id. They're used to track distribution accross
consumers by assigning partitions to consumers. Only one consumer in a
consumer group can subscribe a given partition.

While going distributed, I'd find the choice of relying on Pulsar or
Bookkeeper or Kafka as the distributed Log very interesting.

I would find very smart to provide two version of CouchDB :
 - a classic standalone/master/slave
 - a distributed with FoundationDB layer for KV and a distributed Log
option for the feeds/changes.


On Wed, Mar 20, 2019 at 11:47 PM Adam Kocoloski <kocol...@apache.org> wrote:

> Hi all,
>
> Most of the discussions so far have focused on the core features that are
> fundamental to CouchDB: JSON documents, revision tracking, _changes. I
> thought I’d start a thread on something a bit different: the _db_updates
> feed.
>
> The _db_updates feed is an API that enables users to discover database
> lifecycle events across an entire CouchDB instance. It’s primarily useful
> in deployments that have lots and lots of databases, where it’s impractical
> to keep connections open for every database, and where database creations
> and deletions may be an automated aspect of the application’s use of
> CouchDB.
>
> There are really two topics for discussion here. The first is: do we need
> to keep it? The primary driver of applications creating lots of DBs is the
> per-DB granularity of access controls; if we go down the route of
> implementing the document-level _access proposal perhaps users naturally
> migrate away from this DB-per-user data model. I’d be curious to hear
> points of view there.
>
> I’ll assume for now that we do want to keep it, and offer some thoughts on
> how to implement it. The main challenge with _db_updates is managing the
> write contention; in write-heavy databases you have a lot of producers
> trying to tag that particular database as “updated", but all the consumer
> really cares about is getting a single “dbname”:”updated” event as needed.
> In the current architecture we try to dedupe a lot of the events in-memory
> before updating a regular CouchDB database with this information, but this
> leaves us exposed to possibly dropping events within a few second window.
>
> ## Option 1: Queue + Compaction
>
> One way to tackle this in FoundationDB is to have an intermediate subspace
> reserved as a queue. Each transaction that modifies a database would insert
> a versionstamped KV into the queue like
>
> Versionstamp = (DbName, EventType)
>
> Versionstamps are monotonically increasing and inserting versionstamped
> keys is a conflict-free operation. We’d have a consumer of this queue which
> is responsible for “log compaction”; i.e., the consumer would do range
> reads on the queue subspace, toss out duplicate contiguous
> “dbname”:“updated” events, and update a second index which would look more
> like the _changes feed.
>
> ### Scaling Consumers
>
> A single consumer can likely process 10k events/sec or more, but
> eventually we’ll need to scale. Borrowing from systems like Kafka the
> typical way to do this is to divide the queue into partitions and have
> individual consumers mapped to each partition. A partition in this model
> would just be a prefix on the Versionstamp:
>
> (PartitionID, Versionstamp) = (DbName, EventType)
>
> Our consumers will be more efficient and less likely to conflict with one
> another on updating the _db_updates index if messages are keyed to a
> partition based on DbName, although this still runs the risk that a couple
> of high-throughput databases could swamp a partition.
>
> I’m not sure about the best path forward for handling that scenario. One
> could implement a rate-limiter that starts sloughing off additional
> messages for high-throughput databases (which has some careful edge cases),
> split the messages for a single database across multiple partitions, rely
> on operators to blacklist certain databases from the _db_updates system,
> etc. Each has downsides.
>
> ## Option 2: Atomic Ops + Consumer
>
> In this approach we still have an intermediate subspace, and a consumer of
> that subspace which updates the _db_updates index. But this time, we have
> at most one KV per database in the subspace, with an atomic counter for a
> value. When a document is updated it bumps the counter for its database in
> that subspace. So we’ll have entries like
>
> (“counters”, “db1235”) = 1
> (“counters”, “db0001”) = 42
> (“counters”, “telemetry-db”) = 12312
>
> and so on. Like versionstamps, atomic operations are conflict-free so we
> need not worry about introducing spurious conflicts on high-throughput
> databases.
>
> The initial pass of the consumer logic would go something like this:
>
> - Do a snapshot range read of the “counters” subspace (or whatever we call
> it)
> - Record the current values for all counters in a separate summary KV
> (you’ll see why in a minute)
> - Do a limit=1 range read on the _changes space for each DB in the list to
> grab the latest Sequence
> - Update the _db_updates index with the latest Sequence for each of these
> databases
>
> On a second pass, the consumer would read the summary KV from the last
> pass and compare the previous counters with the current values. If any
> counters have not been updated in the interval, the consumer would try to
> clear those from the “counters” subspace (adding them as explicit conflict
> keys to ensure we don’t miss a concurrent update). It would then proceed
> with the rest of the logic from the initial pass. This is a careful
> balancing act:
>
> - We don’t want to pollute the “counters” subspace with idle databases
> because each entry requires an extra read of _changes
> - We don’t want to attempt to clear counters that are constantly updated
> because that’s going to fail with a conflict every time
>
> The scalability axis here is the number of databases updated within any
> short window of time (~1 second or less). If we end up with that number
> growing large we can have consumers responsible for range of the “counters”
> subspace, though I think that’s less likely than in the queue-based design.
>
> I don’t know in detail what optimizations FoundationDB applies to atomic
> operations (e.g. coalescing them at a layer above the storage engine).
> That’s worth checking into, as otherwise I’d be concerned about introducing
> super-hot keys here.
>
> This option does not handle the “created” and “deleted” lifecycle events
> for each database, but those are really quite simple and could really be
> inserted directly into the _db_updates index.
>
> ===
>
> There are some additional details which can be fleshed out in an RFC, but
> this is the basic gist of things. Both designs would be more robust at
> capturing every single updated database (because the enqueue/increment
> operation would be part of the document update transaction). They would
> allow for a small delay between the document update and the appearance of
> the database in _db_updates, which is no different than we have today. They
> each require a background process.
>
> Let’s hear what you think, both about the interest level for this feature
> and any comments on the designs. I may take this one over to the FDB forums
> as well for feedback. Cheers,
>
> Adam

Reply via email to