Re: How can I repartition/rebalance topics processed by a Kafka Streams topology?

2018-01-16 Thread Dmitry Minkovsky
> Thus, only left/outer KStream-KStream and KStream-KTable join have some
runtime dependencies. For more details about join, check out this blog
post: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

So I am trying to reprocess and topology and seem to have encountered this.
I posted my question to
https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly.
I fear that this will not be something I can work around :(

On Sat, Dec 9, 2017 at 7:52 PM, Matthias J. Sax 
wrote:

> About timestamps: embedding timestamps in the payload itself is not
> really necessary IMHO. Each record has meta-data timestamp that provides
> the exact same semantic. If you just copy data from one topic to
> another, the timestamp can be preserved (using plain consumer/producer
> and setting the timestamp of the input record explicitly as timestamp
> for the output recrod-- for streams, it could be that "some" timestamps
> get altered as we apply slightly different timestamp inference
> logic---but there are plans to improve this and to better inference that
> would preserve the timestamp exactly in Streams, too).
>
> With regard to flow control: it depends on the operators you use. Some
> are fully deterministic, other have some runtime dependencies. Fully
> deterministic are all aggregations (non-windowed and windowed), as well
> as inner KStream-KStream join and all variants (inner/left/outer) of
> KTable-KTable join.
>
> > If the consumer reads P2 before P1, will the task still
> > properly align these two records given their timestamps for the correct
> > inner join, assuming both records within the record buffer?
>
> This will always be computed correctly, even if both records are not in
> the buffer at the same time :)
>
>
> Thus, only left/outer KStream-KStream and KStream-KTable join have some
> runtime dependencies. For more details about join, check out this blog
> post: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
>
> Btw: we are aware of some weaknesses in the current implementation and I
> it's on our road map to strengthen our guarantees. Also with regard to
> the internally used record buffer, time management in general, as well
> as operator semantics.
>
> Note though: Kafka guarantees offset-based ordering, not
> timestamp-ordering. And thus, also in Kafka Streams we process records
> in offset order. This implies, that records might be out-of-order with
> regard to their timestamps, but our operators are implemented to handle
> this case correctly (minus some know issues as mentioned above that we
> are going to fix in future releases).
>
>
> Stateless: I mean, if you write a program that only uses stateless
> operators like filter/map but not aggregation/joins.
>
>
>
> -Matthias
>
>
> On 12/9/17 11:59 AM, Dmitry Minkovsky wrote:
> >> How large is the record buffer? Is it configurable?
> >
> > I seem to have just discovered this answer to this:
> > buffered.records.per.partition
> >
> > On Sat, Dec 9, 2017 at 2:48 PM, Dmitry Minkovsky 
> > wrote:
> >
> >> Hi Matthias, yes that definitely helps. A few thoughts inline below.
> >>
> >> Thank you!
> >>
> >> On Fri, Dec 8, 2017 at 4:21 PM, Matthias J. Sax 
> >> wrote:
> >>
> >>> Hard to give a generic answer.
> >>>
> >>> 1. We recommend to over-partitions your input topics to start with (to
> >>> avoid that you need to add new partitions later on); problem avoidance
> >>> is the best strategy. There will be some overhead for this obviously on
> >>> the broker side, but it's not too big.
> >>>
> >>
> >> Yes,  I will definitely be doing this.
> >>
> >>
> >>>
> >>> 2. Not sure why you would need a new cluster? You can just create a new
> >>> topic in the same cluster and let Kafka Streams read from there.
> >>>
> >>
> >> Motivated by fear of disturbing/manipulating a production cluster and
> the
> >> relative ease of putting up a new cluster. Perhaps that fear is
> irrational.
> >> I could alternatively just prefix topics.
> >>
> >>
> >>>
> >>> 3. Depending on your state requirements, you could also run two
> >>> applications in parallel -- the new one reads from the new input topic
> >>> with more partitions and you configure your producer to write to the
> new
> >>> topic (or maybe even to dual writes to both). If your new application
> is
> >>> ramped up, you can stop the old one.
> >>>
> >>
> >> Yes, this is my plan for migrations. If I could run it past you:
> >>
> >> (i) Write input topics from the old prefix to the new prefix.
> >> (ii) Start the new Kafka Streams application against the new prefix.
> >> (iii) When the two applications are in sync, stop writing to the old
> >> topics
> >>
> >> Since I will be copying from an old prefix to new prefix, it seems
> >> essential here to have timestamps embedded in the data records along
> with a
> >> custom timestamp extractor.
> >>
> >> I really wish I could get some more 

Re: How can I repartition/rebalance topics processed by a Kafka Streams topology?

2017-12-10 Thread Dmitry Minkovsky
Matthias,

Thank you for your detailed response.

Yes—of course I can use the record timestamp when copying from topic to
topic. For some reason that always slips my mind.

> This will always be computed correctly, even if both records are not in
the buffer at the same time :)

This is music to my ears! I will review the blog post you sent.

Thank you again. And for your work on this incredible software.

Dmitry




On Sat, Dec 9, 2017 at 7:52 PM, Matthias J. Sax 
wrote:

> About timestamps: embedding timestamps in the payload itself is not
> really necessary IMHO. Each record has meta-data timestamp that provides
> the exact same semantic. If you just copy data from one topic to
> another, the timestamp can be preserved (using plain consumer/producer
> and setting the timestamp of the input record explicitly as timestamp
> for the output recrod-- for streams, it could be that "some" timestamps
> get altered as we apply slightly different timestamp inference
> logic---but there are plans to improve this and to better inference that
> would preserve the timestamp exactly in Streams, too).
>
> With regard to flow control: it depends on the operators you use. Some
> are fully deterministic, other have some runtime dependencies. Fully
> deterministic are all aggregations (non-windowed and windowed), as well
> as inner KStream-KStream join and all variants (inner/left/outer) of
> KTable-KTable join.
>
> > If the consumer reads P2 before P1, will the task still
> > properly align these two records given their timestamps for the correct
> > inner join, assuming both records within the record buffer?
>
> This will always be computed correctly, even if both records are not in
> the buffer at the same time :)
>
>
> Thus, only left/outer KStream-KStream and KStream-KTable join have some
> runtime dependencies. For more details about join, check out this blog
> post: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
>
> Btw: we are aware of some weaknesses in the current implementation and I
> it's on our road map to strengthen our guarantees. Also with regard to
> the internally used record buffer, time management in general, as well
> as operator semantics.
>
> Note though: Kafka guarantees offset-based ordering, not
> timestamp-ordering. And thus, also in Kafka Streams we process records
> in offset order. This implies, that records might be out-of-order with
> regard to their timestamps, but our operators are implemented to handle
> this case correctly (minus some know issues as mentioned above that we
> are going to fix in future releases).
>
>
> Stateless: I mean, if you write a program that only uses stateless
> operators like filter/map but not aggregation/joins.
>
>
>
> -Matthias
>
>
> On 12/9/17 11:59 AM, Dmitry Minkovsky wrote:
> >> How large is the record buffer? Is it configurable?
> >
> > I seem to have just discovered this answer to this:
> > buffered.records.per.partition
> >
> > On Sat, Dec 9, 2017 at 2:48 PM, Dmitry Minkovsky 
> > wrote:
> >
> >> Hi Matthias, yes that definitely helps. A few thoughts inline below.
> >>
> >> Thank you!
> >>
> >> On Fri, Dec 8, 2017 at 4:21 PM, Matthias J. Sax 
> >> wrote:
> >>
> >>> Hard to give a generic answer.
> >>>
> >>> 1. We recommend to over-partitions your input topics to start with (to
> >>> avoid that you need to add new partitions later on); problem avoidance
> >>> is the best strategy. There will be some overhead for this obviously on
> >>> the broker side, but it's not too big.
> >>>
> >>
> >> Yes,  I will definitely be doing this.
> >>
> >>
> >>>
> >>> 2. Not sure why you would need a new cluster? You can just create a new
> >>> topic in the same cluster and let Kafka Streams read from there.
> >>>
> >>
> >> Motivated by fear of disturbing/manipulating a production cluster and
> the
> >> relative ease of putting up a new cluster. Perhaps that fear is
> irrational.
> >> I could alternatively just prefix topics.
> >>
> >>
> >>>
> >>> 3. Depending on your state requirements, you could also run two
> >>> applications in parallel -- the new one reads from the new input topic
> >>> with more partitions and you configure your producer to write to the
> new
> >>> topic (or maybe even to dual writes to both). If your new application
> is
> >>> ramped up, you can stop the old one.
> >>>
> >>
> >> Yes, this is my plan for migrations. If I could run it past you:
> >>
> >> (i) Write input topics from the old prefix to the new prefix.
> >> (ii) Start the new Kafka Streams application against the new prefix.
> >> (iii) When the two applications are in sync, stop writing to the old
> >> topics
> >>
> >> Since I will be copying from an old prefix to new prefix, it seems
> >> essential here to have timestamps embedded in the data records along
> with a
> >> custom timestamp extractor.
> >>
> >> I really wish I could get some more flavor on "Flow Control With
> >> Timestamps
> >> 

Re: How can I repartition/rebalance topics processed by a Kafka Streams topology?

2017-12-09 Thread Matthias J. Sax
About timestamps: embedding timestamps in the payload itself is not
really necessary IMHO. Each record has meta-data timestamp that provides
the exact same semantic. If you just copy data from one topic to
another, the timestamp can be preserved (using plain consumer/producer
and setting the timestamp of the input record explicitly as timestamp
for the output recrod-- for streams, it could be that "some" timestamps
get altered as we apply slightly different timestamp inference
logic---but there are plans to improve this and to better inference that
would preserve the timestamp exactly in Streams, too).

With regard to flow control: it depends on the operators you use. Some
are fully deterministic, other have some runtime dependencies. Fully
deterministic are all aggregations (non-windowed and windowed), as well
as inner KStream-KStream join and all variants (inner/left/outer) of
KTable-KTable join.

> If the consumer reads P2 before P1, will the task still
> properly align these two records given their timestamps for the correct
> inner join, assuming both records within the record buffer?

This will always be computed correctly, even if both records are not in
the buffer at the same time :)


Thus, only left/outer KStream-KStream and KStream-KTable join have some
runtime dependencies. For more details about join, check out this blog
post: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

Btw: we are aware of some weaknesses in the current implementation and I
it's on our road map to strengthen our guarantees. Also with regard to
the internally used record buffer, time management in general, as well
as operator semantics.

Note though: Kafka guarantees offset-based ordering, not
timestamp-ordering. And thus, also in Kafka Streams we process records
in offset order. This implies, that records might be out-of-order with
regard to their timestamps, but our operators are implemented to handle
this case correctly (minus some know issues as mentioned above that we
are going to fix in future releases).


Stateless: I mean, if you write a program that only uses stateless
operators like filter/map but not aggregation/joins.



-Matthias


On 12/9/17 11:59 AM, Dmitry Minkovsky wrote:
>> How large is the record buffer? Is it configurable?
> 
> I seem to have just discovered this answer to this:
> buffered.records.per.partition
> 
> On Sat, Dec 9, 2017 at 2:48 PM, Dmitry Minkovsky 
> wrote:
> 
>> Hi Matthias, yes that definitely helps. A few thoughts inline below.
>>
>> Thank you!
>>
>> On Fri, Dec 8, 2017 at 4:21 PM, Matthias J. Sax 
>> wrote:
>>
>>> Hard to give a generic answer.
>>>
>>> 1. We recommend to over-partitions your input topics to start with (to
>>> avoid that you need to add new partitions later on); problem avoidance
>>> is the best strategy. There will be some overhead for this obviously on
>>> the broker side, but it's not too big.
>>>
>>
>> Yes,  I will definitely be doing this.
>>
>>
>>>
>>> 2. Not sure why you would need a new cluster? You can just create a new
>>> topic in the same cluster and let Kafka Streams read from there.
>>>
>>
>> Motivated by fear of disturbing/manipulating a production cluster and the
>> relative ease of putting up a new cluster. Perhaps that fear is irrational.
>> I could alternatively just prefix topics.
>>
>>
>>>
>>> 3. Depending on your state requirements, you could also run two
>>> applications in parallel -- the new one reads from the new input topic
>>> with more partitions and you configure your producer to write to the new
>>> topic (or maybe even to dual writes to both). If your new application is
>>> ramped up, you can stop the old one.
>>>
>>
>> Yes, this is my plan for migrations. If I could run it past you:
>>
>> (i) Write input topics from the old prefix to the new prefix.
>> (ii) Start the new Kafka Streams application against the new prefix.
>> (iii) When the two applications are in sync, stop writing to the old
>> topics
>>
>> Since I will be copying from an old prefix to new prefix, it seems
>> essential here to have timestamps embedded in the data records along with a
>> custom timestamp extractor.
>>
>> I really wish I could get some more flavor on "Flow Control With
>> Timestamps
>> "
>> in this regard. Assuming my timestamps are monotonically increasing within
>> each input topic, from my reading of that section it still appears that the
>> result of reprocessing input topics is non-deterministic beyond the
>> "records in its stream record buffer". Some seemingly crucial sentences:
>>
>>> *This flow control is best-effort because it is not always possible to
>> strictly enforce execution order across streams by record timestamp; in
>> fact, in order to enforce strict execution ordering, one must either wait
>> until the system has received all the records from all streams (which may
>> be quite 

Re: How can I repartition/rebalance topics processed by a Kafka Streams topology?

2017-12-09 Thread Dmitry Minkovsky
> How large is the record buffer? Is it configurable?

I seem to have just discovered this answer to this:
buffered.records.per.partition

On Sat, Dec 9, 2017 at 2:48 PM, Dmitry Minkovsky 
wrote:

> Hi Matthias, yes that definitely helps. A few thoughts inline below.
>
> Thank you!
>
> On Fri, Dec 8, 2017 at 4:21 PM, Matthias J. Sax 
> wrote:
>
>> Hard to give a generic answer.
>>
>> 1. We recommend to over-partitions your input topics to start with (to
>> avoid that you need to add new partitions later on); problem avoidance
>> is the best strategy. There will be some overhead for this obviously on
>> the broker side, but it's not too big.
>>
>
> Yes,  I will definitely be doing this.
>
>
>>
>> 2. Not sure why you would need a new cluster? You can just create a new
>> topic in the same cluster and let Kafka Streams read from there.
>>
>
> Motivated by fear of disturbing/manipulating a production cluster and the
> relative ease of putting up a new cluster. Perhaps that fear is irrational.
> I could alternatively just prefix topics.
>
>
>>
>> 3. Depending on your state requirements, you could also run two
>> applications in parallel -- the new one reads from the new input topic
>> with more partitions and you configure your producer to write to the new
>> topic (or maybe even to dual writes to both). If your new application is
>> ramped up, you can stop the old one.
>>
>
> Yes, this is my plan for migrations. If I could run it past you:
>
> (i) Write input topics from the old prefix to the new prefix.
> (ii) Start the new Kafka Streams application against the new prefix.
> (iii) When the two applications are in sync, stop writing to the old
> topics
>
> Since I will be copying from an old prefix to new prefix, it seems
> essential here to have timestamps embedded in the data records along with a
> custom timestamp extractor.
>
> I really wish I could get some more flavor on "Flow Control With
> Timestamps
> "
> in this regard. Assuming my timestamps are monotonically increasing within
> each input topic, from my reading of that section it still appears that the
> result of reprocessing input topics is non-deterministic beyond the
> "records in its stream record buffer". Some seemingly crucial sentences:
>
> > *This flow control is best-effort because it is not always possible to
> strictly enforce execution order across streams by record timestamp; in
> fact, in order to enforce strict execution ordering, one must either wait
> until the system has received all the records from all streams (which may
> be quite infeasible in practice) or inject additional information about
> timestamp boundaries or heuristic estimates such as MillWheel’s watermarks.*
>
>
> Practically, how am I to understand this? How large is the record buffer?
> Is it configurable?
>
> For example, suppose I am re-processing an inner join on partitions P1
> (left) and P2 (right). In the original processing, record K1V1T1 was
> recorded onto P1, then some time laster record K1V2T2 was recorded onto P2.
> As a result, K1V2T2 was joined with K1V1T1. Now, during re-processing, P1
> and P2 contain historical data and the Kafka Streams consumers can read P2
> before P1. If the consumer reads P2 before P1, will the task still
> properly align these two records given their timestamps for the correct
> inner join, assuming both records within the record buffer? I've
> experimented with this, but unfortunately I didn't have time to really set
> up good experiments to satisfy myself.
>
>
>> 4. If you really need to add new partitions, you need to fix up all
>> topics manually -- including all topics Kafka Streams created for you.
>> Adding partitions messes up all your state shared as key-based
>> partitioning changes. This implies that you application must be stopped!
>> Thus, if you have zero downtime requirements you can't do this at all.
>>
>> 5. If you have a stateless application all those issues go away though
>> and you can even add new partitions during runtime.
>>
>>
> Stateless in what sense? Kafka Streams seems to be all about aligning and
> manipulating state to create more state. Are you referring to internal
> state, specifically?
>
>
>
>>
>> Hope this helps.
>>
>>
>> -Matthias
>>
>>
>>
>> On 12/8/17 11:02 AM, Dmitry Minkovsky wrote:
>> > I am about to put a topology into production and I am concerned that I
>> > don't know how to repartition/rebalance the topics in the event that I
>> need
>> > to add more partitions.
>> >
>> > My inclination is that I should spin up a new cluster and run some kind
>> of
>> > consumer/producer combination that takes data from the previous cluster
>> and
>> > writes it to the new cluster. A new instance of the Kafka Streams
>> > application then works against this new cluster. But I'm not sure how to
>> > best execute this, or whether this approach is sound at all. I 

Re: How can I repartition/rebalance topics processed by a Kafka Streams topology?

2017-12-09 Thread Dmitry Minkovsky
Hi Matthias, yes that definitely helps. A few thoughts inline below.

Thank you!

On Fri, Dec 8, 2017 at 4:21 PM, Matthias J. Sax 
wrote:

> Hard to give a generic answer.
>
> 1. We recommend to over-partitions your input topics to start with (to
> avoid that you need to add new partitions later on); problem avoidance
> is the best strategy. There will be some overhead for this obviously on
> the broker side, but it's not too big.
>

Yes,  I will definitely be doing this.


>
> 2. Not sure why you would need a new cluster? You can just create a new
> topic in the same cluster and let Kafka Streams read from there.
>

Motivated by fear of disturbing/manipulating a production cluster and the
relative ease of putting up a new cluster. Perhaps that fear is irrational.
I could alternatively just prefix topics.


>
> 3. Depending on your state requirements, you could also run two
> applications in parallel -- the new one reads from the new input topic
> with more partitions and you configure your producer to write to the new
> topic (or maybe even to dual writes to both). If your new application is
> ramped up, you can stop the old one.
>

Yes, this is my plan for migrations. If I could run it past you:

(i) Write input topics from the old prefix to the new prefix.
(ii) Start the new Kafka Streams application against the new prefix.
(iii) When the two applications are in sync, stop writing to the old topics

Since I will be copying from an old prefix to new prefix, it seems
essential here to have timestamps embedded in the data records along with a
custom timestamp extractor.

I really wish I could get some more flavor on "Flow Control With Timestamps
"
in this regard. Assuming my timestamps are monotonically increasing within
each input topic, from my reading of that section it still appears that the
result of reprocessing input topics is non-deterministic beyond the
"records in its stream record buffer". Some seemingly crucial sentences:

> *This flow control is best-effort because it is not always possible to
strictly enforce execution order across streams by record timestamp; in
fact, in order to enforce strict execution ordering, one must either wait
until the system has received all the records from all streams (which may
be quite infeasible in practice) or inject additional information about
timestamp boundaries or heuristic estimates such as MillWheel’s watermarks.*


Practically, how am I to understand this? How large is the record buffer?
Is it configurable?

For example, suppose I am re-processing an inner join on partitions P1
(left) and P2 (right). In the original processing, record K1V1T1 was
recorded onto P1, then some time laster record K1V2T2 was recorded onto P2.
As a result, K1V2T2 was joined with K1V1T1. Now, during re-processing, P1
and P2 contain historical data and the Kafka Streams consumers can read P2
before P1. If the consumer reads P2 before P1, will the task still properly
align these two records given their timestamps for the correct inner join,
assuming both records within the record buffer? I've experimented with
this, but unfortunately I didn't have time to really set up good
experiments to satisfy myself.


> 4. If you really need to add new partitions, you need to fix up all
> topics manually -- including all topics Kafka Streams created for you.
> Adding partitions messes up all your state shared as key-based
> partitioning changes. This implies that you application must be stopped!
> Thus, if you have zero downtime requirements you can't do this at all.
>
> 5. If you have a stateless application all those issues go away though
> and you can even add new partitions during runtime.
>
>
Stateless in what sense? Kafka Streams seems to be all about aligning and
manipulating state to create more state. Are you referring to internal
state, specifically?



>
> Hope this helps.
>
>
> -Matthias
>
>
>
> On 12/8/17 11:02 AM, Dmitry Minkovsky wrote:
> > I am about to put a topology into production and I am concerned that I
> > don't know how to repartition/rebalance the topics in the event that I
> need
> > to add more partitions.
> >
> > My inclination is that I should spin up a new cluster and run some kind
> of
> > consumer/producer combination that takes data from the previous cluster
> and
> > writes it to the new cluster. A new instance of the Kafka Streams
> > application then works against this new cluster. But I'm not sure how to
> > best execute this, or whether this approach is sound at all. I am
> imagining
> > many things may go wrong. Without going into further speculation, what is
> > the best way to do this?
> >
> > Thank you,
> > Dmitry
> >
>
>


Re: How can I repartition/rebalance topics processed by a Kafka Streams topology?

2017-12-08 Thread Matthias J. Sax
Hard to give a generic answer.

1. We recommend to over-partitions your input topics to start with (to
avoid that you need to add new partitions later on); problem avoidance
is the best strategy. There will be some overhead for this obviously on
the broker side, but it's not too big.

2. Not sure why you would need a new cluster? You can just create a new
topic in the same cluster and let Kafka Streams read from there.

3. Depending on your state requirements, you could also run two
applications in parallel -- the new one reads from the new input topic
with more partitions and you configure your producer to write to the new
topic (or maybe even to dual writes to both). If your new application is
ramped up, you can stop the old one.

4. If you really need to add new partitions, you need to fix up all
topics manually -- including all topics Kafka Streams created for you.
Adding partitions messes up all your state shared as key-based
partitioning changes. This implies that you application must be stopped!
Thus, if you have zero downtime requirements you can't do this at all.

5. If you have a stateless application all those issues go away though
and you can even add new partitions during runtime.


Hope this helps.


-Matthias



On 12/8/17 11:02 AM, Dmitry Minkovsky wrote:
> I am about to put a topology into production and I am concerned that I
> don't know how to repartition/rebalance the topics in the event that I need
> to add more partitions.
> 
> My inclination is that I should spin up a new cluster and run some kind of
> consumer/producer combination that takes data from the previous cluster and
> writes it to the new cluster. A new instance of the Kafka Streams
> application then works against this new cluster. But I'm not sure how to
> best execute this, or whether this approach is sound at all. I am imagining
> many things may go wrong. Without going into further speculation, what is
> the best way to do this?
> 
> Thank you,
> Dmitry
> 



signature.asc
Description: OpenPGP digital signature


How can I repartition/rebalance topics processed by a Kafka Streams topology?

2017-12-08 Thread Dmitry Minkovsky
I am about to put a topology into production and I am concerned that I
don't know how to repartition/rebalance the topics in the event that I need
to add more partitions.

My inclination is that I should spin up a new cluster and run some kind of
consumer/producer combination that takes data from the previous cluster and
writes it to the new cluster. A new instance of the Kafka Streams
application then works against this new cluster. But I'm not sure how to
best execute this, or whether this approach is sound at all. I am imagining
many things may go wrong. Without going into further speculation, what is
the best way to do this?

Thank you,
Dmitry