Adam,

> I think we have inconsistent definitions of Active-Active

Yes, this terminology gets thrown around a lot. IMO "active" means both
producers and consumers are using a cluster under normal operation -- not
just during outages, and not just by something like MM2. (Obviously, MM2
has producers and consumers, but they don't count here.) Conversely,
"standby" or "backup" means that data is being written by a producer, but
it isn't being consumed under normal operation. I qualify this definition
with IMO, as I don't think there is strong consensus here.

I'll also add a caveat about "under normal operation". An active/active
architecture does not necessarily mean that you use both clusters in the
same way all the time -- only that you _could_. You could load-balance
50/50 of your traffic between two clusters, or you could direct 100% to one
and 0% to the other, e.g. if one is farther away or has less hw resources.
But the architecture remains the same (and certainly, MM2 doesn't care
about this detail).

> The producer is only producing to one cluster (primary) and one topic
(topic "table"), and the other cluster (secondary) contains only a
replication of the data via MM2 ("primary.table").

That, by definition, is not active/active.

>What you seemed to be proposing is that the producer's "table" data is
sent fully to each cluster, such that the state can be materialized as a
KTable in each application running on each cluster.

Correct.

> This wouldn't require MM2 at all, so I'm not sure if this is what you
advocated.

You could use a dual-ingest method and send all your data to both clusters,
which would not require MM2. There are many issues with this approach,
primarily wrt to consistency and efficiency.

> The trivial solution seems to be to make your producers produce all
stateful data (topic "table") to each cluster, which makes MM2 unnecessary,
but can also lead to data inconsistencies so it's not exactly foolproof.

Yes, that's something like "dual ingest", which I would not recommend.

> SteamsAppPrimary is consuming from ("table")

What is "table" exactly? I am interpreting this as a KTable changelog
topic, in which case "table" is an output topic of some streams app, i.e.
the app producing the change events. _This_ is the app I mean to suggest
you run on both clusters. Then, "table" will appear on both clusters (no
"primary.table").

The app that is creating the "table" changelog would be processing events
from some other topic, say "events". Then, this is what I recommend:

Primary cluster:
Topics: events, secondary.events, table-changelog
App subscription: events, secondary.events
App output: table-changelog

Secondary cluster:
Topics: events, primary.events, table-changelog
App subscription: events, primary.events
App output: table-changelog

With this arrangement, the app on either cluster will have built up state
in RocksDB based on events from both clusters.

Now, it seems you also want a second app to process this changelog. I can
see a few scenarios:

1) you want to take some external action based on records in the table
changelog, e.g. to send an email every time a password is updated. In this
case, you don't want this app running in both clusters, as you'd get two
emails. So you could run it in one cluster and use offset translation to
migrate during failover. The send-email app is stateless, so you just need
to translate and reset offsets (there is no internal state to rebuild).

2) you want to use the table changelog in a stateful but non-effecting way,
e.g. by keeping a running count of records. This app, like the first, can
be run in both clusters.

3) you want some combination of state and external actions in one big app.
In this case, I'd consider splitting your app in two so that you can built
state in both clusters while effecting external actions in only one cluster
at a time.

Lemme know if that makes sense.

Ryanne

On Tue, Jul 23, 2019 at 10:19 AM Adam Bellemare <adam.bellem...@gmail.com>
wrote:

> Hi Ryanne
>
> I think we have inconsistent definitions of Active-Active. The producer is
> only producing to one cluster (primary) and one topic (topic "table"), and
> the other cluster (secondary) contains only a replication of the data via
> MM2 ("primary.table"). What you seemed to be proposing is that the
> producer's "table" data is sent fully to each cluster, such that the state
> can be materialized as a KTable in each application running on each
> cluster. This wouldn't require MM2 at all, so I'm not sure if this is what
> you advocated.
>
> You also state that "As with normal consumers, the Streams app should 
> *subscribe
> to any remote topics*, e.g. with a regex, s.t. the application state will
> reflect input from either source cluster.". Wouldn't this mean that the
> stateful "table" topic that we wish to materialize would be replicated by
> MM2 from Primary, such that we end up with the following:
>
> *Replicated Entity/Stateful Data:*
> *Primary Cluster: (Live)*
> Topic: "table" (contains data from T = 0 to T = n)
> SteamsAppPrimary is consuming from ("table")
>
> *Secondary Cluster: (Live)*
> Topic: "primary.table" (contains data from T = 0 to T = n)
> SteamsAppSecondary is consuming from ("primary.table")
>
> What does StreamsAppSecondary do when "primary.table" is no longer
> replicated because Primary has died? Additionally, where should the
> producer of topic "table" now write its data to, assuming that Primary
> Cluster is irrevocably lost?
>
> I hope this better outlines my scenario. The trivial solution seems to be
> to make your producers produce all stateful data (topic "table") to each
> cluster, which makes MM2 unnecessary, but can also lead to data
> inconsistencies so it's not exactly foolproof.
>
> Thanks
>
> On Mon, Jul 22, 2019 at 6:32 PM Ryanne Dolan <ryannedo...@gmail.com>
> wrote:
>
>> Hello Adam, thanks for the questions. Yes my organization uses Streams,
>> and yes you can use Streams with MM2/KIP-382, though perhaps not in the way
>> you are describing.
>>
>> The architecture you mention is more "active/standby" than
>> "active/active" IMO. The "secondary" cluster is not being used until a
>> failure, at which point you migrate your app and expect the data to already
>> be there. This works for normal consumers where you can seek() and
>> --reset-offsets. Streams apps can be reset with the
>> kafka-streams-application-reset tool, but as you point out, that doesn't
>> help with rebuilding an app's internal state, which would be missing on the
>> secondary cluster. (Granted, that may be okay depending on your particular
>> application.)
>>
>> A true "active/active" solution IMO would be to run your same Streams app
>> in _both_ clusters (primary, secondary), s.t. the entire application state
>> is available and continuously updated in both clusters. As with normal
>> consumers, the Streams app should subscribe to any remote topics, e.g. with
>> a regex, s.t. the application state will reflect input from either source
>> cluster.
>>
>> This is essentially what Streams' "standby replicas" are -- extra copies
>> of application state to support quicker failover. Without these replicas,
>> Streams would need to start back at offset 0 and re-process everything in
>> order to rebuild state (which you don't want to do during a disaster,
>> especially!). The same logic applies to using Streams with MM2. You _could_
>> failover by resetting the app and rebuilding all the missing state, or you
>> could have a copy of everything sitting there ready when you need it. The
>> easiest way to do the latter is to run your app in both clusters.
>>
>> Hope that helps.
>>
>> Ryanne
>>
>> On Mon, Jul 22, 2019 at 3:11 PM Adam Bellemare <adam.bellem...@gmail.com>
>> wrote:
>>
>>> Hi Ryanne
>>>
>>> I have a quick question for you about Active+Active replication and
>>> Kafka Streams. First, does your org /do you use Kafka Streams? If not then
>>> I think this conversation can end here. ;)
>>>
>>> Secondly, and for the broader Kafka Dev group - what happens if I want
>>> to use Active+Active replication with my Kafka Streams app, say, to
>>> materialize a simple KTable? Based on my understanding, I topic "table" on
>>> the primary cluster will be replicated to the secondary cluster as
>>> "primary.table". In the case of a full cluster failure for primary, the
>>> producer to topic "table" on the primary switches over to the secondary
>>> cluster, creates its own "table" topic and continues to write to there. So
>>> now, assuming we have had no data loss, we end up with:
>>>
>>>
>>> *Primary Cluster: (Dead)*
>>>
>>>
>>> *Secondary Cluster: (Live)*
>>> Topic: "primary.table" (contains data from T = 0 to T = n)
>>> Topic: "table" (contains data from T = n+1 to now)
>>>
>>> If I want to materialize state from using Kafka Streams, obviously I am
>>> now in a bit of a pickle since I need to consume "primary.table" before I
>>> consume "table". Have you encountered rebuilding state in Kafka Streams
>>> using Active-Active? For non-Kafka Streams I can see using a single
>>> consumer for "primary.table" and one for "table", interleaving the
>>> timestamps and performing basic event dispatching based on my own tracked
>>> stream-time, but for Kafka Streams I don't think there exists a solution to
>>> this.
>>>
>>> If you have any thoughts on this or some recommendations for Kafka
>>> Streams with Active-Active I would be very appreciative.
>>>
>>> Thanks
>>> Adam
>>>
>>>
>>>

Reply via email to