Re: KIP-382 + Kafka Streams Question

2019-07-24 Thread Adam Bellemare
Hi Ryanne

> Lemme know if I haven't answered this clearly.

Nope, this was very helpful. Thank you!

> A single "stream" can come from multiple input topics
I overlooked that - I was thinking of simply using the
StreamBuilder.table() functionality instead, but that function doesn't
support a Collection of topics.

Since the topics would be copartitioned by definition, wouldn't the event
dispatcher in PartitionGroup (priorityQueue and streamtime ordering) ensure
that the topics are processed in incrementing streamtime order?

Alternately, I suppose this could be a case where it is a good idea to have
the timestamp of the event within the event's value payload, such that:
StreamBuilder.streams(Set("userEntity", "primary.userEntity"))
.groupByKey()
.reduce()
can allow us to materialize the latest state for a given key.

Thanks Ryanne, this has been a very helpful discussion for me. We are
prototyping the usage of MM2 internally at the moment in anticipation of
its release in 2.4 and want to ensure we have our replication + recovery
strategies sorted out.

Adam

On Tue, Jul 23, 2019 at 7:26 PM Ryanne Dolan  wrote:

> Adam, I think we are converging :)
>
> > "userEntity"...where I only want the latest emailAddress (basic
> materialization) to send an email on account password update.
>
> Yes, you want all "userEntity" data on both clusters. Each cluster will
> have "userEntity" and the remote counterpart
> "secondary/primary.userEntity", as in my example (1). The send-email part
> can run on either cluster (but not both, to avoid duplicate emails),
> subscribing to both "userEntity" and "secondary/primary.userEntity". For
> DR, you can migrate this app between clusters via offset translation and
> the kafka-streams-application-reset tool.
>
> Then, you want a materialize-email-table app running in _both_ clusters,
> so that the latest emails are readily available in RocksDB from either
> cluster. This also subscribes to both "userEntity" and
> "secondary/primary.userEntity" s.t. records originating from either cluster
> are processed.
>
> (Equivalently, send-email and materialize-email-table could be parts of
> the same Streams app, just configured differently, e.g. with send-email
> short-circuited in all but one cluster.)
>
> Under normal operation, your userEntity events are sent to the primary
> cluster (topic: userEntity), processed there via materialize-email-table
> and send-email, and replicated to the secondary cluster (topic:
> primary.userEntity) via MM2. When primary goes down, your producers
> (whatever is sending userEntity events) can failover to the secondary
> cluster (topic: userEntity). This can happen in real-time, i.e. as soon as
> the producer detects an outage or via a load balancer with healthchecks
> etc. So under normal operation, you have all userEntity events in both
> clusters, and both clusters are available for producing to.
>
> N.B. this is not dual-ingest, which would require you always produce
> directly to both clusters. It's active/active, b/c you can produce to
> either cluster at any point in time, and the effect is the same.
>
> > Q1) Where does the producer write its data to if the primary cluster is
> dead?
>
> With active/active like this, you can send to either cluster.
>
> > Q2) How does a Kafka Streams application materialize state from two
> topics?
>
> A Streams app can subscribe to multiple topics. A single "stream" can come
> from multiple input topics (see:
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.Collection-
> )
>
> Likewise, a KTable can be materialized from multiple source topics -- in
> this case, userEntity, primary.userEntity and/or secondary.userEntity. You
> can think of these as parts of a "virtual topic", as you described.
>
> > (loaded question, I know)
>
> There is one caveat I can think of: there is no ordering guarantee across
> different topics in the same stream, so materialization could be
> inconsistent between the two clusters if, say, the same users's email was
> changed to different values at the same millisecond in both clusters. This
> may or may not be a problem.
>
> > Q3) ... recommendations on how to handle replication/producing of
> entity-data (ie: userEntity) across multiple clusters...
>
> Lemme know if I haven't answered this clearly.
>
> Ryanne
>
> On Tue, Jul 23, 2019 at 1:03 PM Adam Bellemare 
> wrote:
>
>> Hi Ryanne
>>
>> Thanks for the clarifications! Here is one of my own, as I think it's the
>> biggest stumbling block in my description:
>>
>> *> What is "table" exactly? I am interpreting this as a KTable changelog
>> topic*
>> "table" is not a KTable changelog topic, but simply entity data that is
>> to be materialized into a table - for example, relational data captured
>> from Kafka Connect. I should have named this "stateful-data" or something
>> less ambiguous and provided an explicit definition. Note that non-KStreams
>> applications will also regularly use 

Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Ryanne Dolan
Adam, I think we are converging :)

> "userEntity"...where I only want the latest emailAddress (basic
materialization) to send an email on account password update.

Yes, you want all "userEntity" data on both clusters. Each cluster will
have "userEntity" and the remote counterpart
"secondary/primary.userEntity", as in my example (1). The send-email part
can run on either cluster (but not both, to avoid duplicate emails),
subscribing to both "userEntity" and "secondary/primary.userEntity". For
DR, you can migrate this app between clusters via offset translation and
the kafka-streams-application-reset tool.

Then, you want a materialize-email-table app running in _both_ clusters, so
that the latest emails are readily available in RocksDB from either
cluster. This also subscribes to both "userEntity" and
"secondary/primary.userEntity" s.t. records originating from either cluster
are processed.

(Equivalently, send-email and materialize-email-table could be parts of the
same Streams app, just configured differently, e.g. with send-email
short-circuited in all but one cluster.)

Under normal operation, your userEntity events are sent to the primary
cluster (topic: userEntity), processed there via materialize-email-table
and send-email, and replicated to the secondary cluster (topic:
primary.userEntity) via MM2. When primary goes down, your producers
(whatever is sending userEntity events) can failover to the secondary
cluster (topic: userEntity). This can happen in real-time, i.e. as soon as
the producer detects an outage or via a load balancer with healthchecks
etc. So under normal operation, you have all userEntity events in both
clusters, and both clusters are available for producing to.

N.B. this is not dual-ingest, which would require you always produce
directly to both clusters. It's active/active, b/c you can produce to
either cluster at any point in time, and the effect is the same.

> Q1) Where does the producer write its data to if the primary cluster is
dead?

With active/active like this, you can send to either cluster.

> Q2) How does a Kafka Streams application materialize state from two
topics?

A Streams app can subscribe to multiple topics. A single "stream" can come
from multiple input topics (see:
https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.Collection-
)

Likewise, a KTable can be materialized from multiple source topics -- in
this case, userEntity, primary.userEntity and/or secondary.userEntity. You
can think of these as parts of a "virtual topic", as you described.

> (loaded question, I know)

There is one caveat I can think of: there is no ordering guarantee across
different topics in the same stream, so materialization could be
inconsistent between the two clusters if, say, the same users's email was
changed to different values at the same millisecond in both clusters. This
may or may not be a problem.

> Q3) ... recommendations on how to handle replication/producing of
entity-data (ie: userEntity) across multiple clusters...

Lemme know if I haven't answered this clearly.

Ryanne

On Tue, Jul 23, 2019 at 1:03 PM Adam Bellemare 
wrote:

> Hi Ryanne
>
> Thanks for the clarifications! Here is one of my own, as I think it's the
> biggest stumbling block in my description:
>
> *> What is "table" exactly? I am interpreting this as a KTable changelog
> topic*
> "table" is not a KTable changelog topic, but simply entity data that is to
> be materialized into a table - for example, relational data captured from
> Kafka Connect. I should have named this "stateful-data" or something less
> ambiguous and provided an explicit definition. Note that non-KStreams
> applications will also regularly use this entity data to materialize their
> own tables, but it in itself is not a KTable internal changelog.
>
> Per your example 1, let's name this topic "userEntity". It could be a
> (key,value) pair of (userId, emailAddress), where I only want the latest
> emailAddress (basic materialization) to send an email on account password
> update. I only want to run the application against one Kafka cluster, and
> because I don't want to use dual-ingest, I am running that application only
> on the cluster where the data is being sent (Primary Cluster). In a
> scenario where all replication is working correctly I could also run this
> off the Secondary cluster's replica, "primary.userEntity"
>
>
>
> *> Yes, that's something like "dual ingest", which I would not recommend.*
> Agreed. I do not want to use dual ingest.
>
> *> Secondary cluster:*
> *> Topics: events, primary.events, table-changelog*
> *> App subscription: events, primary.events*
> *> App output: table-changelog*
>
> Is the "events" topic dual ingest, since it exists in the Primary cluster
> with the exact same name?
>
> The whole scenario can be boiled down into the following:
> 1) Entity data is in a userEntity topic, ie: (userId, emailAddress)
> 2) I want to publish it into an Active-Active 

Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Adam Bellemare
Hi Ryanne

Thanks for the clarifications! Here is one of my own, as I think it's the
biggest stumbling block in my description:

*> What is "table" exactly? I am interpreting this as a KTable changelog
topic*
"table" is not a KTable changelog topic, but simply entity data that is to
be materialized into a table - for example, relational data captured from
Kafka Connect. I should have named this "stateful-data" or something less
ambiguous and provided an explicit definition. Note that non-KStreams
applications will also regularly use this entity data to materialize their
own tables, but it in itself is not a KTable internal changelog.

Per your example 1, let's name this topic "userEntity". It could be a
(key,value) pair of (userId, emailAddress), where I only want the latest
emailAddress (basic materialization) to send an email on account password
update. I only want to run the application against one Kafka cluster, and
because I don't want to use dual-ingest, I am running that application only
on the cluster where the data is being sent (Primary Cluster). In a
scenario where all replication is working correctly I could also run this
off the Secondary cluster's replica, "primary.userEntity"



*> Yes, that's something like "dual ingest", which I would not recommend.*
Agreed. I do not want to use dual ingest.

*> Secondary cluster:*
*> Topics: events, primary.events, table-changelog*
*> App subscription: events, primary.events*
*> App output: table-changelog*

Is the "events" topic dual ingest, since it exists in the Primary cluster
with the exact same name?

The whole scenario can be boiled down into the following:
1) Entity data is in a userEntity topic, ie: (userId, emailAddress)
2) I want to publish it into an Active-Active cluster setup without using
dual-ingest
3) I want to materialize the data into a single table for an application
consuming from a single cluster (Kafka Streams or not)
4) I want to be able to fail over and rebuild the materialized state using
the data I have replicated.
- If all of the entity data is produced to each cluster (dual-ingest) than
it is trivial to fail over and rebuild the materialized table.
- If the data is only produced to Primary and only replicated to Secondary,
at a failover I would need to consume from the replicated topic.
*Q1) Where does the producer write its data to if the primary cluster
is dead?*
It seems to me that it must then write its data to the only
remaining cluster. This would then put the entity data in two topics as I
had originally outlined, as below:
*Secondary Cluster: (Live)   (renamed table to userEntity)*
  Topic: "primary.userEntity" (contains data from T = 0 to T = n)
  Topic: "userEntity" (contains data from T = n+1 to now, the
failed-over producer)


*Q2) How does a Kafka Streams application materialize state from two
topics? (loaded question, I know)*
  Since I know this isn't built in, is there some sort of technique
or system that you use to allow for a single virtual topic made up of many
logical topics?

*Q3) Do you have any recommendations on how to handle replication/producing
of entity-data (ie: userEntity) across multiple clusters, such that an
application may correctly (or even near-correctly) materialize state after
a failover like the one I described above?*
This is really the golden question. We're currently developing our
Active-Passive approach, but we want to be prepared for scenarios where we
have multiple clusters with entity-replication between clusters.


Thanks Ryanne!


On Tue, Jul 23, 2019 at 12:39 PM Ryanne Dolan  wrote:

> Adam,
>
> > I think we have inconsistent definitions of Active-Active
>
> Yes, this terminology gets thrown around a lot. IMO "active" means both
> producers and consumers are using a cluster under normal operation -- not
> just during outages, and not just by something like MM2. (Obviously, MM2
> has producers and consumers, but they don't count here.) Conversely,
> "standby" or "backup" means that data is being written by a producer, but
> it isn't being consumed under normal operation. I qualify this definition
> with IMO, as I don't think there is strong consensus here.
>
> I'll also add a caveat about "under normal operation". An active/active
> architecture does not necessarily mean that you use both clusters in the
> same way all the time -- only that you _could_. You could load-balance
> 50/50 of your traffic between two clusters, or you could direct 100% to one
> and 0% to the other, e.g. if one is farther away or has less hw resources.
> But the architecture remains the same (and certainly, MM2 doesn't care
> about this detail).
>
> > The producer is only producing to one cluster (primary) and one topic
> (topic "table"), and the other cluster (secondary) contains only a
> replication of the data via MM2 ("primary.table").
>
> That, by definition, is not active/active.
>
> >What you seemed to be proposing is that the producer's "table" 

Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Ryanne Dolan
Adam,

> I think we have inconsistent definitions of Active-Active

Yes, this terminology gets thrown around a lot. IMO "active" means both
producers and consumers are using a cluster under normal operation -- not
just during outages, and not just by something like MM2. (Obviously, MM2
has producers and consumers, but they don't count here.) Conversely,
"standby" or "backup" means that data is being written by a producer, but
it isn't being consumed under normal operation. I qualify this definition
with IMO, as I don't think there is strong consensus here.

I'll also add a caveat about "under normal operation". An active/active
architecture does not necessarily mean that you use both clusters in the
same way all the time -- only that you _could_. You could load-balance
50/50 of your traffic between two clusters, or you could direct 100% to one
and 0% to the other, e.g. if one is farther away or has less hw resources.
But the architecture remains the same (and certainly, MM2 doesn't care
about this detail).

> The producer is only producing to one cluster (primary) and one topic
(topic "table"), and the other cluster (secondary) contains only a
replication of the data via MM2 ("primary.table").

That, by definition, is not active/active.

>What you seemed to be proposing is that the producer's "table" data is
sent fully to each cluster, such that the state can be materialized as a
KTable in each application running on each cluster.

Correct.

> This wouldn't require MM2 at all, so I'm not sure if this is what you
advocated.

You could use a dual-ingest method and send all your data to both clusters,
which would not require MM2. There are many issues with this approach,
primarily wrt to consistency and efficiency.

> The trivial solution seems to be to make your producers produce all
stateful data (topic "table") to each cluster, which makes MM2 unnecessary,
but can also lead to data inconsistencies so it's not exactly foolproof.

Yes, that's something like "dual ingest", which I would not recommend.

> SteamsAppPrimary is consuming from ("table")

What is "table" exactly? I am interpreting this as a KTable changelog
topic, in which case "table" is an output topic of some streams app, i.e.
the app producing the change events. _This_ is the app I mean to suggest
you run on both clusters. Then, "table" will appear on both clusters (no
"primary.table").

The app that is creating the "table" changelog would be processing events
from some other topic, say "events". Then, this is what I recommend:

Primary cluster:
Topics: events, secondary.events, table-changelog
App subscription: events, secondary.events
App output: table-changelog

Secondary cluster:
Topics: events, primary.events, table-changelog
App subscription: events, primary.events
App output: table-changelog

With this arrangement, the app on either cluster will have built up state
in RocksDB based on events from both clusters.

Now, it seems you also want a second app to process this changelog. I can
see a few scenarios:

1) you want to take some external action based on records in the table
changelog, e.g. to send an email every time a password is updated. In this
case, you don't want this app running in both clusters, as you'd get two
emails. So you could run it in one cluster and use offset translation to
migrate during failover. The send-email app is stateless, so you just need
to translate and reset offsets (there is no internal state to rebuild).

2) you want to use the table changelog in a stateful but non-effecting way,
e.g. by keeping a running count of records. This app, like the first, can
be run in both clusters.

3) you want some combination of state and external actions in one big app.
In this case, I'd consider splitting your app in two so that you can built
state in both clusters while effecting external actions in only one cluster
at a time.

Lemme know if that makes sense.

Ryanne

On Tue, Jul 23, 2019 at 10:19 AM Adam Bellemare 
wrote:

> Hi Ryanne
>
> I think we have inconsistent definitions of Active-Active. The producer is
> only producing to one cluster (primary) and one topic (topic "table"), and
> the other cluster (secondary) contains only a replication of the data via
> MM2 ("primary.table"). What you seemed to be proposing is that the
> producer's "table" data is sent fully to each cluster, such that the state
> can be materialized as a KTable in each application running on each
> cluster. This wouldn't require MM2 at all, so I'm not sure if this is what
> you advocated.
>
> You also state that "As with normal consumers, the Streams app should 
> *subscribe
> to any remote topics*, e.g. with a regex, s.t. the application state will
> reflect input from either source cluster.". Wouldn't this mean that the
> stateful "table" topic that we wish to materialize would be replicated by
> MM2 from Primary, such that we end up with the following:
>
> *Replicated Entity/Stateful Data:*
> *Primary Cluster: (Live)*
> Topic: "table" 

Re: KIP-382 + Kafka Streams Question

2019-07-23 Thread Adam Bellemare
Hi Ryanne

I think we have inconsistent definitions of Active-Active. The producer is
only producing to one cluster (primary) and one topic (topic "table"), and
the other cluster (secondary) contains only a replication of the data via
MM2 ("primary.table"). What you seemed to be proposing is that the
producer's "table" data is sent fully to each cluster, such that the state
can be materialized as a KTable in each application running on each
cluster. This wouldn't require MM2 at all, so I'm not sure if this is what
you advocated.

You also state that "As with normal consumers, the Streams app should
*subscribe
to any remote topics*, e.g. with a regex, s.t. the application state will
reflect input from either source cluster.". Wouldn't this mean that the
stateful "table" topic that we wish to materialize would be replicated by
MM2 from Primary, such that we end up with the following:

*Replicated Entity/Stateful Data:*
*Primary Cluster: (Live)*
Topic: "table" (contains data from T = 0 to T = n)
SteamsAppPrimary is consuming from ("table")

*Secondary Cluster: (Live)*
Topic: "primary.table" (contains data from T = 0 to T = n)
SteamsAppSecondary is consuming from ("primary.table")

What does StreamsAppSecondary do when "primary.table" is no longer
replicated because Primary has died? Additionally, where should the
producer of topic "table" now write its data to, assuming that Primary
Cluster is irrevocably lost?

I hope this better outlines my scenario. The trivial solution seems to be
to make your producers produce all stateful data (topic "table") to each
cluster, which makes MM2 unnecessary, but can also lead to data
inconsistencies so it's not exactly foolproof.

Thanks

On Mon, Jul 22, 2019 at 6:32 PM Ryanne Dolan  wrote:

> Hello Adam, thanks for the questions. Yes my organization uses Streams,
> and yes you can use Streams with MM2/KIP-382, though perhaps not in the way
> you are describing.
>
> The architecture you mention is more "active/standby" than "active/active"
> IMO. The "secondary" cluster is not being used until a failure, at which
> point you migrate your app and expect the data to already be there. This
> works for normal consumers where you can seek() and --reset-offsets.
> Streams apps can be reset with the kafka-streams-application-reset tool,
> but as you point out, that doesn't help with rebuilding an app's internal
> state, which would be missing on the secondary cluster. (Granted, that may
> be okay depending on your particular application.)
>
> A true "active/active" solution IMO would be to run your same Streams app
> in _both_ clusters (primary, secondary), s.t. the entire application state
> is available and continuously updated in both clusters. As with normal
> consumers, the Streams app should subscribe to any remote topics, e.g. with
> a regex, s.t. the application state will reflect input from either source
> cluster.
>
> This is essentially what Streams' "standby replicas" are -- extra copies
> of application state to support quicker failover. Without these replicas,
> Streams would need to start back at offset 0 and re-process everything in
> order to rebuild state (which you don't want to do during a disaster,
> especially!). The same logic applies to using Streams with MM2. You _could_
> failover by resetting the app and rebuilding all the missing state, or you
> could have a copy of everything sitting there ready when you need it. The
> easiest way to do the latter is to run your app in both clusters.
>
> Hope that helps.
>
> Ryanne
>
> On Mon, Jul 22, 2019 at 3:11 PM Adam Bellemare 
> wrote:
>
>> Hi Ryanne
>>
>> I have a quick question for you about Active+Active replication and Kafka
>> Streams. First, does your org /do you use Kafka Streams? If not then I
>> think this conversation can end here. ;)
>>
>> Secondly, and for the broader Kafka Dev group - what happens if I want to
>> use Active+Active replication with my Kafka Streams app, say, to
>> materialize a simple KTable? Based on my understanding, I topic "table" on
>> the primary cluster will be replicated to the secondary cluster as
>> "primary.table". In the case of a full cluster failure for primary, the
>> producer to topic "table" on the primary switches over to the secondary
>> cluster, creates its own "table" topic and continues to write to there. So
>> now, assuming we have had no data loss, we end up with:
>>
>>
>> *Primary Cluster: (Dead)*
>>
>>
>> *Secondary Cluster: (Live)*
>> Topic: "primary.table" (contains data from T = 0 to T = n)
>> Topic: "table" (contains data from T = n+1 to now)
>>
>> If I want to materialize state from using Kafka Streams, obviously I am
>> now in a bit of a pickle since I need to consume "primary.table" before I
>> consume "table". Have you encountered rebuilding state in Kafka Streams
>> using Active-Active? For non-Kafka Streams I can see using a single
>> consumer for "primary.table" and one for "table", interleaving the
>> timestamps and performing basic event 

Re: KIP-382 + Kafka Streams Question

2019-07-22 Thread Ryanne Dolan
Hello Adam, thanks for the questions. Yes my organization uses Streams, and
yes you can use Streams with MM2/KIP-382, though perhaps not in the way you
are describing.

The architecture you mention is more "active/standby" than "active/active"
IMO. The "secondary" cluster is not being used until a failure, at which
point you migrate your app and expect the data to already be there. This
works for normal consumers where you can seek() and --reset-offsets.
Streams apps can be reset with the kafka-streams-application-reset tool,
but as you point out, that doesn't help with rebuilding an app's internal
state, which would be missing on the secondary cluster. (Granted, that may
be okay depending on your particular application.)

A true "active/active" solution IMO would be to run your same Streams app
in _both_ clusters (primary, secondary), s.t. the entire application state
is available and continuously updated in both clusters. As with normal
consumers, the Streams app should subscribe to any remote topics, e.g. with
a regex, s.t. the application state will reflect input from either source
cluster.

This is essentially what Streams' "standby replicas" are -- extra copies of
application state to support quicker failover. Without these replicas,
Streams would need to start back at offset 0 and re-process everything in
order to rebuild state (which you don't want to do during a disaster,
especially!). The same logic applies to using Streams with MM2. You _could_
failover by resetting the app and rebuilding all the missing state, or you
could have a copy of everything sitting there ready when you need it. The
easiest way to do the latter is to run your app in both clusters.

Hope that helps.

Ryanne

On Mon, Jul 22, 2019 at 3:11 PM Adam Bellemare 
wrote:

> Hi Ryanne
>
> I have a quick question for you about Active+Active replication and Kafka
> Streams. First, does your org /do you use Kafka Streams? If not then I
> think this conversation can end here. ;)
>
> Secondly, and for the broader Kafka Dev group - what happens if I want to
> use Active+Active replication with my Kafka Streams app, say, to
> materialize a simple KTable? Based on my understanding, I topic "table" on
> the primary cluster will be replicated to the secondary cluster as
> "primary.table". In the case of a full cluster failure for primary, the
> producer to topic "table" on the primary switches over to the secondary
> cluster, creates its own "table" topic and continues to write to there. So
> now, assuming we have had no data loss, we end up with:
>
>
> *Primary Cluster: (Dead)*
>
>
> *Secondary Cluster: (Live)*
> Topic: "primary.table" (contains data from T = 0 to T = n)
> Topic: "table" (contains data from T = n+1 to now)
>
> If I want to materialize state from using Kafka Streams, obviously I am
> now in a bit of a pickle since I need to consume "primary.table" before I
> consume "table". Have you encountered rebuilding state in Kafka Streams
> using Active-Active? For non-Kafka Streams I can see using a single
> consumer for "primary.table" and one for "table", interleaving the
> timestamps and performing basic event dispatching based on my own tracked
> stream-time, but for Kafka Streams I don't think there exists a solution to
> this.
>
> If you have any thoughts on this or some recommendations for Kafka Streams
> with Active-Active I would be very appreciative.
>
> Thanks
> Adam
>
>
>


KIP-382 + Kafka Streams Question

2019-07-22 Thread Adam Bellemare
Hi Ryanne

I have a quick question for you about Active+Active replication and Kafka
Streams. First, does your org /do you use Kafka Streams? If not then I
think this conversation can end here. ;)

Secondly, and for the broader Kafka Dev group - what happens if I want to
use Active+Active replication with my Kafka Streams app, say, to
materialize a simple KTable? Based on my understanding, I topic "table" on
the primary cluster will be replicated to the secondary cluster as
"primary.table". In the case of a full cluster failure for primary, the
producer to topic "table" on the primary switches over to the secondary
cluster, creates its own "table" topic and continues to write to there. So
now, assuming we have had no data loss, we end up with:


*Primary Cluster: (Dead)*


*Secondary Cluster: (Live)*
Topic: "primary.table" (contains data from T = 0 to T = n)
Topic: "table" (contains data from T = n+1 to now)

If I want to materialize state from using Kafka Streams, obviously I am now
in a bit of a pickle since I need to consume "primary.table" before I
consume "table". Have you encountered rebuilding state in Kafka Streams
using Active-Active? For non-Kafka Streams I can see using a single
consumer for "primary.table" and one for "table", interleaving the
timestamps and performing basic event dispatching based on my own tracked
stream-time, but for Kafka Streams I don't think there exists a solution to
this.

If you have any thoughts on this or some recommendations for Kafka Streams
with Active-Active I would be very appreciative.

Thanks
Adam


Re: Kafka Streams question

2016-07-14 Thread Michael Noll
Also, in the next version of Kafka / Kafka Streams such "intermediate"
topics will automatically be created for you when you do joins or
aggregations:

https://issues.apache.org/jira/browse/KAFKA-3561

So my previous message explained your options today when using the current
release of Kafka Streams (v0.10.0.0).

-Michael




On Thu, Jul 14, 2016 at 10:32 AM, Michael Noll  wrote:

> Poul,
>
> to add to what Matthias said:  If you are wondering how to manually create
> a topic, you have basically two options.
>
> A. Use Kafka's CLI tools to create the topic "from the outside".
>
> # Example
> $ kafka-topics.sh --create --topic my-custom-toipc --zookeeper
> localhost:2181 --partitions 1 --replication-factor 1
>
> B. Use Kafka's API to programmatically create the topic.  See [1] for an
> example.
>
> Question for you to learn how we could perhaps improve the status quo:
>  How would you have expected this to work in the current Kafka Streams
> API?  For example, would you have expected that, say, the `through()`
> method would accept parameters to specify the number of partitions?
>
>
> Hope this helps,
> Michael
>
> [1]
> https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/confluent/examples/streams/kafka/KafkaEmbedded.java#L133-L160
>
>
>
>
> On Thu, Jul 14, 2016 at 10:08 AM, Matthias J. Sax 
> wrote:
>
>> Hi,
>>
>> you can manually create a topic with the number of partitions you want
>> to have and use this topic via through()
>>
>> KStream input = ...
>>
>> input.map().through("manually-created-topic").join(...)
>>
>> However, both KStream and KTable need to have the same number of
>> partitions for perform the join. Thus, you might need to create a topic
>> (with the same number of partitions) for the table, too.
>>
>> See
>>
>> http://docs.confluent.io/3.0.0/streams/developer-guide.html#joining-streams
>>
>>
>> -Matthias
>>
>> On 07/13/2016 11:59 PM, Poul Costinsky wrote:
>> > Hi! I am prototyping some code using Kafka Streams, and have a
>> question. I need to map a stream into another (with different partition
>> key) and join it with a table. How do I control number of partitions of the
>> mapped stream?
>> >
>> > Thanks!
>> >
>> > Poul Costinsky
>> > Chief Architect
>> >
>> >  
>> > (360) 207-1753 
>> >
>> >
>> >
>> >
>> >
>>
>>
>
>
> --
>
> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
> <%2B1%20650.453.5860>Download Apache Kafka and Confluent Platform:
> www.confluent.io/download *
>



-- 

*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
*


Re: Kafka Streams question

2016-07-14 Thread Michael Noll
Poul,

to add to what Matthias said:  If you are wondering how to manually create
a topic, you have basically two options.

A. Use Kafka's CLI tools to create the topic "from the outside".

# Example
$ kafka-topics.sh --create --topic my-custom-toipc --zookeeper
localhost:2181 --partitions 1 --replication-factor 1

B. Use Kafka's API to programmatically create the topic.  See [1] for an
example.

Question for you to learn how we could perhaps improve the status quo:  How
would you have expected this to work in the current Kafka Streams API?  For
example, would you have expected that, say, the `through()` method would
accept parameters to specify the number of partitions?


Hope this helps,
Michael

[1]
https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/confluent/examples/streams/kafka/KafkaEmbedded.java#L133-L160




On Thu, Jul 14, 2016 at 10:08 AM, Matthias J. Sax 
wrote:

> Hi,
>
> you can manually create a topic with the number of partitions you want
> to have and use this topic via through()
>
> KStream input = ...
>
> input.map().through("manually-created-topic").join(...)
>
> However, both KStream and KTable need to have the same number of
> partitions for perform the join. Thus, you might need to create a topic
> (with the same number of partitions) for the table, too.
>
> See
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#joining-streams
>
>
> -Matthias
>
> On 07/13/2016 11:59 PM, Poul Costinsky wrote:
> > Hi! I am prototyping some code using Kafka Streams, and have a question.
> I need to map a stream into another (with different partition key) and join
> it with a table. How do I control number of partitions of the mapped stream?
> >
> > Thanks!
> >
> > Poul Costinsky
> > Chief Architect
> >
> >  
> > (360) 207-1753 
> >
> >
> >
> >
> >
>
>


-- 

*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
*


Re: Kafka Streams question

2016-07-14 Thread Matthias J. Sax
Hi,

you can manually create a topic with the number of partitions you want
to have and use this topic via through()

KStream input = ...

input.map().through("manually-created-topic").join(...)

However, both KStream and KTable need to have the same number of
partitions for perform the join. Thus, you might need to create a topic
(with the same number of partitions) for the table, too.

See
http://docs.confluent.io/3.0.0/streams/developer-guide.html#joining-streams


-Matthias

On 07/13/2016 11:59 PM, Poul Costinsky wrote:
> Hi! I am prototyping some code using Kafka Streams, and have a question. I 
> need to map a stream into another (with different partition key) and join it 
> with a table. How do I control number of partitions of the mapped stream?
> 
> Thanks! 
> 
> Poul Costinsky
> Chief Architect
> 
>  
> (360) 207-1753 
> 
> 
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Kafka Streams question

2016-07-13 Thread Poul Costinsky
Hi! I am prototyping some code using Kafka Streams, and have a question. I need 
to map a stream into another (with different partition key) and join it with a 
table. How do I control number of partitions of the mapped stream?

Thanks! 

Poul Costinsky
Chief Architect

 
(360) 207-1753