Hi Steven, yep, thanks for adding that. When I wrote this I was double-checking 
the behavior of consumer groups to see if it was ever useful to have more 
consumers in a consumer group than partitions in the associated topic. Seems 
like the answer is no — partitions need to be allocated with an eye towards 
consumer throughput, not just producer throughput.

> If you map a consumer with a partition, you will really suffer with 
> operational issues.

I’m curious to hear a little more about this one. Are you simply saying that 
decoupling partitions and consumers has a bunch of Ops benefits, or are you 
saying that any time a consumer group with N consumers is reading from a topic 
with N partitions that's asking for trouble?

Pulsar / Kafka as a tool for change feed storage is a really interesting 
conversation. I’ve previously taken a close look at both of them for that use 
case and I do think there’s a lot of potential. I know Pulsar in particular was 
designed to solve that sort of problem at scale. I had been presuming that this 
would be a “both/and” kind of thing; i.e., every CouchDB would still have a 
built-in _changes feed, but via Pulsar I/O or Kafka Connect you could easily 
wire that feed into the message bus for improved distribution, a broader 
ecosystem, better management, etc. When you start talking about replacing the 
internal feed altogether there are a number of complications:

- Transactional updates across both FoundationDB and Pulsar/Kafka is a hard 
problem
- Documents showing up "exactly once” is possible with topic compaction, but 
not transactional
- Internal consumers of the change feed, e.g. the secondary view system would 
get more complicated

I like that I’m not the only one thinking these things go hand-in-hand, though 
:)

Adam

> On Mar 25, 2019, at 1:06 PM, Steven Le Roux <leroux.ste...@gmail.com> wrote:
> 
> Hi Adam,
> 
> Regarding the consumer scaling point, I just want to mention that
> Partitions are used in Kafka (or Pulsar) to scale a topic but not directly
> consumers. If you map a consumer with a partition, you will really suffer
> with operational issues.
> 
> Consumers are managed in "consumer groups" that isolate them which is a
> handy way to choose between a shared message queue to a pub/sub model. If
> you need consumers to read the same data, then you need them to have a
> different consumer group id. They're used to track distribution accross
> consumers by assigning partitions to consumers. Only one consumer in a
> consumer group can subscribe a given partition.
> 
> While going distributed, I'd find the choice of relying on Pulsar or
> Bookkeeper or Kafka as the distributed Log very interesting.
> 
> I would find very smart to provide two version of CouchDB :
> - a classic standalone/master/slave
> - a distributed with FoundationDB layer for KV and a distributed Log
> option for the feeds/changes.
> 
> 
> On Wed, Mar 20, 2019 at 11:47 PM Adam Kocoloski <kocol...@apache.org> wrote:
> 
>> Hi all,
>> 
>> Most of the discussions so far have focused on the core features that are
>> fundamental to CouchDB: JSON documents, revision tracking, _changes. I
>> thought I’d start a thread on something a bit different: the _db_updates
>> feed.
>> 
>> The _db_updates feed is an API that enables users to discover database
>> lifecycle events across an entire CouchDB instance. It’s primarily useful
>> in deployments that have lots and lots of databases, where it’s impractical
>> to keep connections open for every database, and where database creations
>> and deletions may be an automated aspect of the application’s use of
>> CouchDB.
>> 
>> There are really two topics for discussion here. The first is: do we need
>> to keep it? The primary driver of applications creating lots of DBs is the
>> per-DB granularity of access controls; if we go down the route of
>> implementing the document-level _access proposal perhaps users naturally
>> migrate away from this DB-per-user data model. I’d be curious to hear
>> points of view there.
>> 
>> I’ll assume for now that we do want to keep it, and offer some thoughts on
>> how to implement it. The main challenge with _db_updates is managing the
>> write contention; in write-heavy databases you have a lot of producers
>> trying to tag that particular database as “updated", but all the consumer
>> really cares about is getting a single “dbname”:”updated” event as needed.
>> In the current architecture we try to dedupe a lot of the events in-memory
>> before updating a regular CouchDB database with this information, but this
>> leaves us exposed to possibly dropping events within a few second window.
>> 
>> ## Option 1: Queue + Compaction
>> 
>> One way to tackle this in FoundationDB is to have an intermediate subspace
>> reserved as a queue. Each transaction that modifies a database would insert
>> a versionstamped KV into the queue like
>> 
>> Versionstamp = (DbName, EventType)
>> 
>> Versionstamps are monotonically increasing and inserting versionstamped
>> keys is a conflict-free operation. We’d have a consumer of this queue which
>> is responsible for “log compaction”; i.e., the consumer would do range
>> reads on the queue subspace, toss out duplicate contiguous
>> “dbname”:“updated” events, and update a second index which would look more
>> like the _changes feed.
>> 
>> ### Scaling Consumers
>> 
>> A single consumer can likely process 10k events/sec or more, but
>> eventually we’ll need to scale. Borrowing from systems like Kafka the
>> typical way to do this is to divide the queue into partitions and have
>> individual consumers mapped to each partition. A partition in this model
>> would just be a prefix on the Versionstamp:
>> 
>> (PartitionID, Versionstamp) = (DbName, EventType)
>> 
>> Our consumers will be more efficient and less likely to conflict with one
>> another on updating the _db_updates index if messages are keyed to a
>> partition based on DbName, although this still runs the risk that a couple
>> of high-throughput databases could swamp a partition.
>> 
>> I’m not sure about the best path forward for handling that scenario. One
>> could implement a rate-limiter that starts sloughing off additional
>> messages for high-throughput databases (which has some careful edge cases),
>> split the messages for a single database across multiple partitions, rely
>> on operators to blacklist certain databases from the _db_updates system,
>> etc. Each has downsides.
>> 
>> ## Option 2: Atomic Ops + Consumer
>> 
>> In this approach we still have an intermediate subspace, and a consumer of
>> that subspace which updates the _db_updates index. But this time, we have
>> at most one KV per database in the subspace, with an atomic counter for a
>> value. When a document is updated it bumps the counter for its database in
>> that subspace. So we’ll have entries like
>> 
>> (“counters”, “db1235”) = 1
>> (“counters”, “db0001”) = 42
>> (“counters”, “telemetry-db”) = 12312
>> 
>> and so on. Like versionstamps, atomic operations are conflict-free so we
>> need not worry about introducing spurious conflicts on high-throughput
>> databases.
>> 
>> The initial pass of the consumer logic would go something like this:
>> 
>> - Do a snapshot range read of the “counters” subspace (or whatever we call
>> it)
>> - Record the current values for all counters in a separate summary KV
>> (you’ll see why in a minute)
>> - Do a limit=1 range read on the _changes space for each DB in the list to
>> grab the latest Sequence
>> - Update the _db_updates index with the latest Sequence for each of these
>> databases
>> 
>> On a second pass, the consumer would read the summary KV from the last
>> pass and compare the previous counters with the current values. If any
>> counters have not been updated in the interval, the consumer would try to
>> clear those from the “counters” subspace (adding them as explicit conflict
>> keys to ensure we don’t miss a concurrent update). It would then proceed
>> with the rest of the logic from the initial pass. This is a careful
>> balancing act:
>> 
>> - We don’t want to pollute the “counters” subspace with idle databases
>> because each entry requires an extra read of _changes
>> - We don’t want to attempt to clear counters that are constantly updated
>> because that’s going to fail with a conflict every time
>> 
>> The scalability axis here is the number of databases updated within any
>> short window of time (~1 second or less). If we end up with that number
>> growing large we can have consumers responsible for range of the “counters”
>> subspace, though I think that’s less likely than in the queue-based design.
>> 
>> I don’t know in detail what optimizations FoundationDB applies to atomic
>> operations (e.g. coalescing them at a layer above the storage engine).
>> That’s worth checking into, as otherwise I’d be concerned about introducing
>> super-hot keys here.
>> 
>> This option does not handle the “created” and “deleted” lifecycle events
>> for each database, but those are really quite simple and could really be
>> inserted directly into the _db_updates index.
>> 
>> ===
>> 
>> There are some additional details which can be fleshed out in an RFC, but
>> this is the basic gist of things. Both designs would be more robust at
>> capturing every single updated database (because the enqueue/increment
>> operation would be part of the document update transaction). They would
>> allow for a small delay between the document update and the appearance of
>> the database in _db_updates, which is no different than we have today. They
>> each require a background process.
>> 
>> Let’s hear what you think, both about the interest level for this feature
>> and any comments on the designs. I may take this one over to the FDB forums
>> as well for feedback. Cheers,
>> 
>> Adam

Reply via email to