Hi Ryanne Thanks for the clarifications! Here is one of my own, as I think it's the biggest stumbling block in my description:
*> What is "table" exactly? I am interpreting this as a KTable changelog topic* "table" is not a KTable changelog topic, but simply entity data that is to be materialized into a table - for example, relational data captured from Kafka Connect. I should have named this "stateful-data" or something less ambiguous and provided an explicit definition. Note that non-KStreams applications will also regularly use this entity data to materialize their own tables, but it in itself is not a KTable internal changelog. Per your example 1, let's name this topic "userEntity". It could be a (key,value) pair of (userId, emailAddress), where I only want the latest emailAddress (basic materialization) to send an email on account password update. I only want to run the application against one Kafka cluster, and because I don't want to use dual-ingest, I am running that application only on the cluster where the data is being sent (Primary Cluster). In a scenario where all replication is working correctly I could also run this off the Secondary cluster's replica, "primary.userEntity" *> Yes, that's something like "dual ingest", which I would not recommend.* Agreed. I do not want to use dual ingest. *> Secondary cluster:* *> Topics: events, primary.events, table-changelog* *> App subscription: events, primary.events* *> App output: table-changelog* Is the "events" topic dual ingest, since it exists in the Primary cluster with the exact same name? The whole scenario can be boiled down into the following: 1) Entity data is in a userEntity topic, ie: (userId, emailAddress) 2) I want to publish it into an Active-Active cluster setup without using dual-ingest 3) I want to materialize the data into a single table for an application consuming from a single cluster (Kafka Streams or not) 4) I want to be able to fail over and rebuild the materialized state using the data I have replicated. - If all of the entity data is produced to each cluster (dual-ingest) than it is trivial to fail over and rebuild the materialized table. - If the data is only produced to Primary and only replicated to Secondary, at a failover I would need to consume from the replicated topic. * Q1) Where does the producer write its data to if the primary cluster is dead?* It seems to me that it must then write its data to the only remaining cluster. This would then put the entity data in two topics as I had originally outlined, as below: * Secondary Cluster: (Live) (renamed table to userEntity)* Topic: "primary.userEntity" (contains data from T = 0 to T = n) Topic: "userEntity" (contains data from T = n+1 to now, the failed-over producer) * Q2) How does a Kafka Streams application materialize state from two topics? (loaded question, I know)* Since I know this isn't built in, is there some sort of technique or system that you use to allow for a single virtual topic made up of many logical topics? *Q3) Do you have any recommendations on how to handle replication/producing of entity-data (ie: userEntity) across multiple clusters, such that an application may correctly (or even near-correctly) materialize state after a failover like the one I described above?* This is really the golden question. We're currently developing our Active-Passive approach, but we want to be prepared for scenarios where we have multiple clusters with entity-replication between clusters. Thanks Ryanne! On Tue, Jul 23, 2019 at 12:39 PM Ryanne Dolan <ryannedo...@gmail.com> wrote: > Adam, > > > I think we have inconsistent definitions of Active-Active > > Yes, this terminology gets thrown around a lot. IMO "active" means both > producers and consumers are using a cluster under normal operation -- not > just during outages, and not just by something like MM2. (Obviously, MM2 > has producers and consumers, but they don't count here.) Conversely, > "standby" or "backup" means that data is being written by a producer, but > it isn't being consumed under normal operation. I qualify this definition > with IMO, as I don't think there is strong consensus here. > > I'll also add a caveat about "under normal operation". An active/active > architecture does not necessarily mean that you use both clusters in the > same way all the time -- only that you _could_. You could load-balance > 50/50 of your traffic between two clusters, or you could direct 100% to one > and 0% to the other, e.g. if one is farther away or has less hw resources. > But the architecture remains the same (and certainly, MM2 doesn't care > about this detail). > > > The producer is only producing to one cluster (primary) and one topic > (topic "table"), and the other cluster (secondary) contains only a > replication of the data via MM2 ("primary.table"). > > That, by definition, is not active/active. > > >What you seemed to be proposing is that the producer's "table" data is > sent fully to each cluster, such that the state can be materialized as a > KTable in each application running on each cluster. > > Correct. > > > This wouldn't require MM2 at all, so I'm not sure if this is what you > advocated. > > You could use a dual-ingest method and send all your data to both > clusters, which would not require MM2. There are many issues with this > approach, primarily wrt to consistency and efficiency. > > > The trivial solution seems to be to make your producers produce all > stateful data (topic "table") to each cluster, which makes MM2 unnecessary, > but can also lead to data inconsistencies so it's not exactly foolproof. > > Yes, that's something like "dual ingest", which I would not recommend. > > > SteamsAppPrimary is consuming from ("table") > > What is "table" exactly? I am interpreting this as a KTable changelog > topic, in which case "table" is an output topic of some streams app, i.e. > the app producing the change events. _This_ is the app I mean to suggest > you run on both clusters. Then, "table" will appear on both clusters (no > "primary.table"). > > The app that is creating the "table" changelog would be processing events > from some other topic, say "events". Then, this is what I recommend: > > Primary cluster: > Topics: events, secondary.events, table-changelog > App subscription: events, secondary.events > App output: table-changelog > > Secondary cluster: > Topics: events, primary.events, table-changelog > App subscription: events, primary.events > App output: table-changelog > > With this arrangement, the app on either cluster will have built up state > in RocksDB based on events from both clusters. > > Now, it seems you also want a second app to process this changelog. I can > see a few scenarios: > > 1) you want to take some external action based on records in the table > changelog, e.g. to send an email every time a password is updated. In this > case, you don't want this app running in both clusters, as you'd get two > emails. So you could run it in one cluster and use offset translation to > migrate during failover. The send-email app is stateless, so you just need > to translate and reset offsets (there is no internal state to rebuild). > > 2) you want to use the table changelog in a stateful but non-effecting > way, e.g. by keeping a running count of records. This app, like the first, > can be run in both clusters. > > 3) you want some combination of state and external actions in one big app. > In this case, I'd consider splitting your app in two so that you can built > state in both clusters while effecting external actions in only one cluster > at a time. > > Lemme know if that makes sense. > > Ryanne > > On Tue, Jul 23, 2019 at 10:19 AM Adam Bellemare <adam.bellem...@gmail.com> > wrote: > >> Hi Ryanne >> >> I think we have inconsistent definitions of Active-Active. The producer >> is only producing to one cluster (primary) and one topic (topic "table"), >> and the other cluster (secondary) contains only a replication of the data >> via MM2 ("primary.table"). What you seemed to be proposing is that the >> producer's "table" data is sent fully to each cluster, such that the state >> can be materialized as a KTable in each application running on each >> cluster. This wouldn't require MM2 at all, so I'm not sure if this is what >> you advocated. >> >> You also state that "As with normal consumers, the Streams app should >> *subscribe >> to any remote topics*, e.g. with a regex, s.t. the application state >> will reflect input from either source cluster.". Wouldn't this mean that >> the stateful "table" topic that we wish to materialize would be replicated >> by MM2 from Primary, such that we end up with the following: >> >> *Replicated Entity/Stateful Data:* >> *Primary Cluster: (Live)* >> Topic: "table" (contains data from T = 0 to T = n) >> SteamsAppPrimary is consuming from ("table") >> >> *Secondary Cluster: (Live)* >> Topic: "primary.table" (contains data from T = 0 to T = n) >> SteamsAppSecondary is consuming from ("primary.table") >> >> What does StreamsAppSecondary do when "primary.table" is no longer >> replicated because Primary has died? Additionally, where should the >> producer of topic "table" now write its data to, assuming that Primary >> Cluster is irrevocably lost? >> >> I hope this better outlines my scenario. The trivial solution seems to be >> to make your producers produce all stateful data (topic "table") to each >> cluster, which makes MM2 unnecessary, but can also lead to data >> inconsistencies so it's not exactly foolproof. >> >> Thanks >> >> On Mon, Jul 22, 2019 at 6:32 PM Ryanne Dolan <ryannedo...@gmail.com> >> wrote: >> >>> Hello Adam, thanks for the questions. Yes my organization uses Streams, >>> and yes you can use Streams with MM2/KIP-382, though perhaps not in the way >>> you are describing. >>> >>> The architecture you mention is more "active/standby" than >>> "active/active" IMO. The "secondary" cluster is not being used until a >>> failure, at which point you migrate your app and expect the data to already >>> be there. This works for normal consumers where you can seek() and >>> --reset-offsets. Streams apps can be reset with the >>> kafka-streams-application-reset tool, but as you point out, that doesn't >>> help with rebuilding an app's internal state, which would be missing on the >>> secondary cluster. (Granted, that may be okay depending on your particular >>> application.) >>> >>> A true "active/active" solution IMO would be to run your same Streams >>> app in _both_ clusters (primary, secondary), s.t. the entire application >>> state is available and continuously updated in both clusters. As with >>> normal consumers, the Streams app should subscribe to any remote topics, >>> e.g. with a regex, s.t. the application state will reflect input from >>> either source cluster. >>> >>> This is essentially what Streams' "standby replicas" are -- extra copies >>> of application state to support quicker failover. Without these replicas, >>> Streams would need to start back at offset 0 and re-process everything in >>> order to rebuild state (which you don't want to do during a disaster, >>> especially!). The same logic applies to using Streams with MM2. You _could_ >>> failover by resetting the app and rebuilding all the missing state, or you >>> could have a copy of everything sitting there ready when you need it. The >>> easiest way to do the latter is to run your app in both clusters. >>> >>> Hope that helps. >>> >>> Ryanne >>> >>> On Mon, Jul 22, 2019 at 3:11 PM Adam Bellemare <adam.bellem...@gmail.com> >>> wrote: >>> >>>> Hi Ryanne >>>> >>>> I have a quick question for you about Active+Active replication and >>>> Kafka Streams. First, does your org /do you use Kafka Streams? If not then >>>> I think this conversation can end here. ;) >>>> >>>> Secondly, and for the broader Kafka Dev group - what happens if I want >>>> to use Active+Active replication with my Kafka Streams app, say, to >>>> materialize a simple KTable? Based on my understanding, I topic "table" on >>>> the primary cluster will be replicated to the secondary cluster as >>>> "primary.table". In the case of a full cluster failure for primary, the >>>> producer to topic "table" on the primary switches over to the secondary >>>> cluster, creates its own "table" topic and continues to write to there. So >>>> now, assuming we have had no data loss, we end up with: >>>> >>>> >>>> *Primary Cluster: (Dead)* >>>> >>>> >>>> *Secondary Cluster: (Live)* >>>> Topic: "primary.table" (contains data from T = 0 to T = n) >>>> Topic: "table" (contains data from T = n+1 to now) >>>> >>>> If I want to materialize state from using Kafka Streams, obviously I am >>>> now in a bit of a pickle since I need to consume "primary.table" before I >>>> consume "table". Have you encountered rebuilding state in Kafka Streams >>>> using Active-Active? For non-Kafka Streams I can see using a single >>>> consumer for "primary.table" and one for "table", interleaving the >>>> timestamps and performing basic event dispatching based on my own tracked >>>> stream-time, but for Kafka Streams I don't think there exists a solution to >>>> this. >>>> >>>> If you have any thoughts on this or some recommendations for Kafka >>>> Streams with Active-Active I would be very appreciative. >>>> >>>> Thanks >>>> Adam >>>> >>>> >>>>