KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-19 Thread Peter Levart


I'm trying to use kafka streams to aggregate some time series data using 
1 second tumbling time windows. The data is ordered approximately by 
timestamp with some "jitter" which I'm limiting at the input by a custom 
TimestampExtractor that moves events into the future if they come in to 
late guaranteeing that the timestamp of each event never jumps back for 
more that 4 seconds according to previous most recent event timestamp. I 
then give the tumbling windows a grace period of 5 seconds...

Here's a sample kafka streams processor:

KStream input 
=builder.stream(inputTopic,Consumed.with(Serdes.String(), new 
Val.Serde()).withTimestampExtractor((rec, prevTs) -> {String key = 
(String) rec.key();Val val = (Val) rec.value();return 
Math.max(val.getTimestamp(), Math.max(0L, prevTs - 
4000));}));KStream, IntegerList> grouped 
v, list) -> {list.add(v.getValue());return 
list;},Materialized.with(Serdes.String(), new 
SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde()));

I'm using KTable.suppress with Suppressed.untilWindowCloses to suppress 
all but final versions of aggregations. This works as expected and I 
only get one final result per grouping key and window instance in the 
output topic. But this only works as expected and advertised until I 
restart the karfka streams process during the course of aggregating the 
events. After restart, I can see some non-final versions of aggregations 
in the output topic followed by final versions. So the guarantee 
advertised by Suppressed.untilWindowCloses() which says:

/"This option is suitable for use cases in which the business logic 
requires a hard guarantee that only the final result is propagated."/

...is only true when the kafka streams process is not restarted. Is this 
expected behavior or maybe a bug?


Peter Levart

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-19 Thread Peter Levart
I see the list processor managed to smash may beautifully formatted HTML 
message. For that reason I'm re-sending the sample code snippet in plain 
text mode...

 Here's a sample kafka streams processor:

    KStream input =
    Consumed.with(Serdes.String(), new Val.Serde())
    .withTimestampExtractor((rec, prevTs) -> {
    String key = (String) rec.key();
    Val val = (Val) rec.value();
    return Math.max(val.getTimestamp(), 
Math.max(0L, prevTs - 4000));


    KStream, IntegerList> grouped =
    (k, v, list) -> {
    return list;
    Materialized.with(Serdes.String(), new 



SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde())


Regards, Peter

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-20 Thread Guozhang Wang
Hello Peter,

Thanks for filing this report, I've looked into the source code and I think
I may spotted an edge case to your observations. To validate if my
suspicion is correct, could you try modifying your DSL code a little bit,
to use a very large suppression buffer size --- BTW the
StrictBufferConfigImpl is an internal class (you can tell by its name) and
are not recommend to use in your code. More specifically:



and see if this issue still exists?


On Wed, Dec 19, 2018 at 1:50 PM Peter Levart  wrote:

> I see the list processor managed to smash may beautifully formatted HTML
> message. For that reason I'm re-sending the sample code snippet in plain
> text mode...
>   Here's a sample kafka streams processor:
>  KStream input =
>  builder
>  .stream(
>  inputTopic,
>  Consumed.with(Serdes.String(), new Val.Serde())
>  .withTimestampExtractor((rec, prevTs) -> {
>  String key = (String) rec.key();
>  Val val = (Val) rec.value();
>  return Math.max(val.getTimestamp(),
> Math.max(0L, prevTs - 4000));
>  })
>  );
>  KStream, IntegerList> grouped =
>  input
>  .groupByKey()
>  .windowedBy(
>  TimeWindows.of(Duration.ofSeconds(1))
> .advanceBy(Duration.ofSeconds(1))
> .grace(Duration.ofSeconds(5))
>  )
>  .aggregate(
>  IntegerList::new,
>  (k, v, list) -> {
>  list.add(v.getValue());
>  return list;
>  },
>  Materialized.with(Serdes.String(), new
> IntegerList.Serde())
>  )
>  .suppress(
>  Suppressed.untilWindowCloses(new
> StrictBufferConfigImpl())
>  )
>  .toStream();
>  grouped.to(
>  outputTopic,
>  Produced.with(new
> SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde())
>  );
> Regards, Peter

-- Guozhang

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-21 Thread Peter Levart

Hello Guozhang,

Thank you for looking into this problem.

I noticed that I have been using an internal class constructor and later 
discovered the right API to create the StrictBufferConfig 
implementations. But I'm afraid that using your proposed factory method 
won't change anything since its implementation is as follows:

    static StrictBufferConfig unbounded() {
    return new StrictBufferConfigImpl();

...it creates an instance of the same class as my sample code below, so 
the program behaves the same...

What does this mean? Was your suggestion meant to rule-out any other 
possible causes and your suspicion still holds or did you suspect that I 
was not using suppression buffer of sufficient size?

Regards, Peter

On 12/21/18 1:58 AM, Guozhang Wang wrote:

Hello Peter,

Thanks for filing this report, I've looked into the source code and I think
I may spotted an edge case to your observations. To validate if my
suspicion is correct, could you try modifying your DSL code a little bit,
to use a very large suppression buffer size --- BTW the
StrictBufferConfigImpl is an internal class (you can tell by its name) and
are not recommend to use in your code. More specifically:



and see if this issue still exists?


On Wed, Dec 19, 2018 at 1:50 PM Peter Levart  wrote:

I see the list processor managed to smash may beautifully formatted HTML
message. For that reason I'm re-sending the sample code snippet in plain
text mode...

   Here's a sample kafka streams processor:

  KStream input =
  Consumed.with(Serdes.String(), new Val.Serde())
  .withTimestampExtractor((rec, prevTs) -> {
  String key = (String) rec.key();
  Val val = (Val) rec.value();
  return Math.max(val.getTimestamp(),
Math.max(0L, prevTs - 4000));

  KStream, IntegerList> grouped =
  (k, v, list) -> {
  return list;
  Materialized.with(Serdes.String(), new

SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde())

Regards, Peter

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-21 Thread Peter Levart

Hello Guozhang,

May I just add some more observations which might help you pin-point the 

When the process that runs the kafka streams processing threads is 
restarted, I can see duplicates in the output topic. But that is 
understandable for "at least once semantics" and I don't mind if there 
are duplicates if they are duplicates of final results of window 
aggregations. My logic is prepared for that. But I also see some results 
that are actual non-final window aggregations that precede the final 
aggregations. These non-final results are never emitted out of order 
(for example, no such non-final result would ever come after the final 
result for a particular key/window).

For example, here are some log fragments of a sample consumption of the 
output topic where I detect either duplicates or "incremental updates" 
of some key/window and mark them with "INSTEAD OF" words. I only show 
incremental updates here:

[pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14, 
272, 548, 172], sum: 138902
[pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14, 
272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164


[pool-1-thread-2] APP Consumed: [c@1545398882000/1545398884000] -> [681, 
116, 542, 543, 0, 0, 0, 0], sum: 143046
[pool-1-thread-2] APP Consumed: [c@1545398882000/1545398884000] -> [681, 
116, 542, 543, 0, 0, 0, 0, 0, 0, 0, 0] INSTEAD OF [681, 116, 542, 543, 
0, 0, 0, 0], sum: 143046

The rule seems to be that almost always the non-final result precedes 
immediately in the log the final result. I say almost, because I also 
saw one occurrence of the following:

[pool-1-thread-3] APP Consumed: [b@1545398878000/154539888] -> [756, 
454, 547, 300, 323], sum: 166729
[pool-1-thread-3] APP Consumed: [b@154539888/1545398882000] -> [193, 
740, 660, 981], sum: 169303
[pool-1-thread-3] APP Consumed: [b@1545398878000/154539888] -> [756, 
454, 547, 300, 323, 421, 378, 354, 0] INSTEAD OF [756, 454, 547, 300, 
323], sum: 170456
[pool-1-thread-3] APP Consumed: [b@154539888/1545398882000] -> [193, 
740, 660, 981, 879, 209, 104, 0, 0, 0] INSTEAD OF [193, 740, 660, 981], 
sum: 171648

Here the incremental update of the key/window happened for two 
consecutive 2 second windows in close succession and the results were 

What you see in the above log before the window start/end timestamps is 
a Sting key which is used in groupByKey (a, b, c, d). The input and 
output topics have 4 partitions and I use 4 streams processing threads...

Hope this helps you find the problem.

So could this be considered a bug? I don't know how this suppression is 
supposed to work, but it seems that it does not use any persistent 
storage for suppression buffer. So after the streams processing process 
is restarted, it starts with a fresh buffer. What mechanism are used to 
guarantee that in spite of that, the suppress(untilWindowCloses) 
suppresses non-final results?

Regards, Peter

On 12/21/18 10:48 AM, Peter Levart wrote:

Hello Guozhang,

Thank you for looking into this problem.

I noticed that I have been using an internal class constructor and 
later discovered the right API to create the StrictBufferConfig 
implementations. But I'm afraid that using your proposed factory 
method won't change anything since its implementation is as follows:

    static StrictBufferConfig unbounded() {
    return new StrictBufferConfigImpl();

...it creates an instance of the same class as my sample code below, 
so the program behaves the same...

What does this mean? Was your suggestion meant to rule-out any other 
possible causes and your suspicion still holds or did you suspect that 
I was not using suppression buffer of sufficient size?

Regards, Peter

On 12/21/18 1:58 AM, Guozhang Wang wrote:

Hello Peter,

Thanks for filing this report, I've looked into the source code and I 

I may spotted an edge case to your observations. To validate if my
suspicion is correct, could you try modifying your DSL code a little 

to use a very large suppression buffer size --- BTW the
StrictBufferConfigImpl is an internal class (you can tell by its 
name) and

are not recommend to use in your code. More specifically:



and see if this issue still exists?


On Wed, Dec 19, 2018 at 1:50 PM Peter Levart  

I see the list processor managed to smash may beautifully formatted 
message. For that reason I'm re-sending the sample code snippet in 

text mode...

   Here's a sample kafka streams processor:

  KStream input =
  Consumed.with(Serdes.String(), new Val.Serde())
  .withTimestampExtractor((rec, prevTs) 
-> {

  String key = (String) rec.key();

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-25 Thread Peter Levart

Hello Guozhang,

Just wanted to say that I have managed to come up with a different 
solution that doesn't have these problems.

Instead of doing the following:

    Materialized.with(keySerde, resultValueSerde)

... I used a custom Transformer:


... with some help from a persistent WindowStore:

    // create store
    StoreBuilder> windowStoreBuilder =
    false // don't allow duplicates

    // register store

Here's the GroupByKeyWindowedTransformer implementation:

 * @author Peter Levart
public class GroupByKeyWindowedTransformer implements 
Transformer, VR>> {

    public static  TransformerSupplierKeyValue, VR>> supplier(

    TimeWindows windows,
    String windowStoreName,
    Initializer initializer,
    Aggregator aggregator
    ) {
    VR zeroRes = initializer.apply();
    return () -> new GroupByKeyWindowedTransformer<>(

    private final TimeWindows windows;
    private final String windowStoreName;
    private final Initializer initializer;
    private final Aggregator aggregator;
    private final VR zeroRes;

    private GroupByKeyWindowedTransformer(
    TimeWindows windows,
    String windowStoreName,
    Initializer initializer,
    Aggregator aggregator,
    VR zeroRes
    ) {
    this.windows = windows;
    this.windowStoreName = windowStoreName;
    this.initializer = initializer;
    this.aggregator = aggregator;
    this.zeroRes = zeroRes;

    private ProcessorContext context;
    private WindowStore store;

    public void init(ProcessorContext context) {
    this.context = context;
    this.store = (WindowStore) 


    public KeyValue, VR> transform(K key, V value) {
    long ts = context.timestamp();

    // aggregate into windows
    for (TimeWindow tw : windows.windowsFor(ts).values()) {
    VR res = store.fetch(key, tw.start());
    if (!zeroRes.equals(res)) { // not flushed yet (see below)
    if (res == null) res = initializer.apply();
    res = aggregator.apply(key, value, res);
    assert !zeroRes.equals(res);
    store.put(key, res, tw.start());

    // flush windows that are overdue (startTime < this event time 
minus window size minus grace period)
    try (WindowStoreIterator iter = store.fetch(key, 0L, ts - 
windows.size() - windows.gracePeriodMs() - 1L)) {

    while (iter.hasNext()) {
    KeyValue kv = iter.next();
    if (kv.value != null && !zeroRes.equals(kv.value)) {
    TimeWindow tw = new TimeWindow(kv.key, kv.key + 

    context.forward(new Windowed<>(key, tw), kv.value);
    store.put(key, zeroRes, kv.key); // mark slot 
flushed by writing zero result value


    return null;

    public void close() {

With this sample code I don't even get duplicates in the output topic 
when the processor is restarted and I also don't get any non-final 
results of windowed aggregations.

The question is whether such transformer is correct (have I missed 
something?) and whether it is comparable to the DSL implementation above 
in terms of performance (will have to test).

Also, is it possible to make such stream processor redundant (tolerable 
to loss of local window store) and how?

I still hope that DSL variant could be made to work.

Regards, Peter

On 12/21/18 3:16 PM, Peter Levart wrote:

Hello Guozhang,

May I just add some more observations which might help you pin-point 
the problem...

When the process that runs the kafka streams processing threads is 
restarted, I can see duplicates in the output topic. But that is 
understandable for "at least once semantics" and I don't mind if there 
are duplicates if they are duplicates of fina

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-26 Thread Peter Levart

On 12/21/18 3:16 PM, Peter Levart wrote:
I also see some results that are actual non-final window aggregations 
that precede the final aggregations. These non-final results are never 
emitted out of order (for example, no such non-final result would ever 
come after the final result for a particular key/window). 

Absence of proof is not the proof of absence... And I have later 
observed (using the DSL variant, not the custom Transformer) an 
occurrence of a non-final result that was emited after restart of 
streams processor while the final result for the same key/window had 
been emitted before the restart:

[pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550, 
81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856

... restart ...
[pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550] 
INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648

The app logic can not even rely on guarantee that results are ordered 
then. This is really not usable until the bug is fixed.

Regards, Peter

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-07 Thread John Roesler
Hi Peter,

Sorry, I just now have seen this thread.

You asked if this behavior is unexpected, and the answer is yes.
Suppress.untilWindowCloses is intended to emit only the final result,
regardless of restarts.

You also asked how the suppression buffer can resume after a restart, since
it's not persistent.
The answer is the same as for in-memory stores. The state of the store (or
buffer, in this case)
is persisted to a changelog topic, which is re-read on restart to re-create
the exact state prior to shutdown.
"Persistent" in the store nomenclature refers only to "persistent on the
local disk".

Just to confirm your response regarding the buffer size:
While it is better to use the public ("Suppressed.unbounded()") API, yes,
your buffer was already unbounded.

I looked at your custom transfomer, and it looks almost correct to me. The
only flaw seems to be that it only looks
for closed windows for the key currently being processed, which means that
if you have key "A" buffered, but don't get another event for it for a
while after the window closes, you won't emit the final result. This might
actually take longer than the window retention period, in which case, the
data would be deleted without ever emitting the final result.

You said you think it should be possible to get the DSL version working,
and I agree, since this is exactly what it was designed for. Do you mind
filing a bug in the "KAFKA" Jira project (
https://issues.apache.org/jira/secure/Dashboard.jspa)? It will be easier to
keep the investigation organized that way.

In the mean time, I'll take another look at your logs above and try to
reason about what could be wrong.

Just one clarification... For example, you showed
> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
272, 548, 172], sum: 138902
> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164

Am I correct in thinking that the first, shorter list is the "incremental"
version, and the second is the "final" version? I think so, but am confused

Thanks for the report,

On Wed, Dec 26, 2018 at 3:21 AM Peter Levart  wrote:

> On 12/21/18 3:16 PM, Peter Levart wrote:
> > I also see some results that are actual non-final window aggregations
> > that precede the final aggregations. These non-final results are never
> > emitted out of order (for example, no such non-final result would ever
> > come after the final result for a particular key/window).
> Absence of proof is not the proof of absence... And I have later
> observed (using the DSL variant, not the custom Transformer) an
> occurrence of a non-final result that was emited after restart of
> streams processor while the final result for the same key/window had
> been emitted before the restart:
> [pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550,
> 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856
> ...
> ... restart ...
> ...
> [pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550]
> INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648
> The app logic can not even rely on guarantee that results are ordered
> then. This is really not usable until the bug is fixed.
> Regards, Peter

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-08 Thread Peter Levart

Hi John,

On 1/7/19 9:10 PM, John Roesler wrote:

Hi Peter,

Sorry, I just now have seen this thread.

You asked if this behavior is unexpected, and the answer is yes.
Suppress.untilWindowCloses is intended to emit only the final result,
regardless of restarts.

You also asked how the suppression buffer can resume after a restart, since
it's not persistent.
The answer is the same as for in-memory stores. The state of the store (or
buffer, in this case)
is persisted to a changelog topic, which is re-read on restart to re-create
the exact state prior to shutdown.
"Persistent" in the store nomenclature refers only to "persistent on the
local disk".

Just to confirm your response regarding the buffer size:
While it is better to use the public ("Suppressed.unbounded()") API, yes,
your buffer was already unbounded.

I looked at your custom transfomer, and it looks almost correct to me. The
only flaw seems to be that it only looks
for closed windows for the key currently being processed, which means that
if you have key "A" buffered, but don't get another event for it for a
while after the window closes, you won't emit the final result. This might
actually take longer than the window retention period, in which case, the
data would be deleted without ever emitting the final result.

So in DSL case, the suppression works by flushing *all* of the "ripe" 
windows in the whole buffer whenever a singe event comes in with recent 
enough timestamp regardless of the key of that event?

Is the buffer shared among processing tasks or does each task maintain 
its own private buffer that only contains its share of data pertaining 
to assigned input partitions? In case the tasks are executed on several 
processing JVM(s) the buffer can't really be shared, right? In that case 
a single event can't flush all of the "ripe" windows, but just those 
that are contained in the task's part of buffer...

You said you think it should be possible to get the DSL version working,
and I agree, since this is exactly what it was designed for. Do you mind
filing a bug in the "KAFKA" Jira project (
https://issues.apache.org/jira/secure/Dashboard.jspa)? It will be easier to
keep the investigation organized that way.

Will do that.

In the mean time, I'll take another look at your logs above and try to
reason about what could be wrong.

Just one clarification... For example, you showed

[pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,

272, 548, 172], sum: 138902

[pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,

272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164

Am I correct in thinking that the first, shorter list is the "incremental"
version, and the second is the "final" version? I think so, but am confused

It's the other way around. The 1st list (usually the longer one) is what 
has just been consumed and the second is what had been consumed before 
that for the same key (I maintain a ConcurrentHashMap of consumed 
entries in the test and execute: secondList = map.put(key, firstList) 

In majority of cases, the consumed list is an incremental update of some 
previous version of the list (not necessarily direct descendant) 
consumed before that, but as said, I also observed the final window 
result before processor restart and after restart some previous version 
of non-final window aggregation for the same key.

May I also note that there is some "jitter" in the input timestamps 
because I'm trying to model a real usecase where there will be several 
input(s) to the system with only approximately synchronized clocks. The 
jitter is kept well below the TimeWindow grace period so there should be 
no events consumed by the processor that belong to windows that have 
already been flushed.

Regards, Peter

Thanks for the report,

On Wed, Dec 26, 2018 at 3:21 AM Peter Levart  wrote:

On 12/21/18 3:16 PM, Peter Levart wrote:

I also see some results that are actual non-final window aggregations
that precede the final aggregations. These non-final results are never
emitted out of order (for example, no such non-final result would ever
come after the final result for a particular key/window).

Absence of proof is not the proof of absence... And I have later
observed (using the DSL variant, not the custom Transformer) an
occurrence of a non-final result that was emited after restart of
streams processor while the final result for the same key/window had
been emitted before the restart:

[pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550,
81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856
... restart ...
[pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550]
INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648

The app logic can not even rely on guarantee that results are ordered
then. This is really not usable until the bug is fixed.

Regards, Peter

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-08 Thread Peter Levart

Hi John,

On 1/8/19 12:45 PM, Peter Levart wrote:
I looked at your custom transfomer, and it looks almost correct to 
me. The

only flaw seems to be that it only looks
for closed windows for the key currently being processed, which means 

if you have key "A" buffered, but don't get another event for it for a
while after the window closes, you won't emit the final result. This 
actually take longer than the window retention period, in which case, 

data would be deleted without ever emitting the final result.

So in DSL case, the suppression works by flushing *all* of the "ripe" 
windows in the whole buffer whenever a singe event comes in with 
recent enough timestamp regardless of the key of that event?

Is the buffer shared among processing tasks or does each task maintain 
its own private buffer that only contains its share of data pertaining 
to assigned input partitions? In case the tasks are executed on 
several processing JVM(s) the buffer can't really be shared, right? In 
that case a single event can't flush all of the "ripe" windows, but 
just those that are contained in the task's part of buffer... 

Just a question about your comment above:

/"This might actually take longer than the window retention period, in 
which case, the data would be deleted without ever emitting the final 

Are you talking about the buffer log topic retention? Aren't log topics 
configured to "compact" rather than "delete" messages? So the last 
"version" of the buffer entry for a particular key should stay forever? 
What are the keys in suppression buffer log topic? Are they a pair of 
(timestamp, key) ? Probably not since in that case the compacted log 
would grow indefinitely...

Another question:

What are the keys in WindowStore's log topic? If the input keys to the 
processor that uses such WindowStore consist of a bounded set of values 
(for example user ids), would compacted log of such WindowStore also be 

Regards, Peter

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-08 Thread Peter Levart

On 1/8/19 12:57 PM, Peter Levart wrote:

Hi John,

On 1/8/19 12:45 PM, Peter Levart wrote:
I looked at your custom transfomer, and it looks almost correct to 
me. The

only flaw seems to be that it only looks
for closed windows for the key currently being processed, which 
means that

if you have key "A" buffered, but don't get another event for it for a
while after the window closes, you won't emit the final result. This 
actually take longer than the window retention period, in which 
case, the

data would be deleted without ever emitting the final result.

So in DSL case, the suppression works by flushing *all* of the "ripe" 
windows in the whole buffer whenever a singe event comes in with 
recent enough timestamp regardless of the key of that event?

Is the buffer shared among processing tasks or does each task 
maintain its own private buffer that only contains its share of data 
pertaining to assigned input partitions? In case the tasks are 
executed on several processing JVM(s) the buffer can't really be 
shared, right? In that case a single event can't flush all of the 
"ripe" windows, but just those that are contained in the task's part 
of buffer... 

Just a question about your comment above:

/"This might actually take longer than the window retention period, in 
which case, the data would be deleted without ever emitting the final 

Are you talking about the buffer log topic retention? Aren't log 
topics configured to "compact" rather than "delete" messages? So the 
last "version" of the buffer entry for a particular key should stay 
forever? What are the keys in suppression buffer log topic? Are they a 
pair of (timestamp, key) ? Probably not since in that case the 
compacted log would grow indefinitely...

Another question:

What are the keys in WindowStore's log topic? If the input keys to the 
processor that uses such WindowStore consist of a bounded set of 
values (for example user ids), would compacted log of such WindowStore 
also be bounded?

In case the key of WindowStore log topic is (timestamp, key) then would 
explicitly deleting flushed entries from WindowStore (by putting null 
value into the store) keep the compacted log bounded? In other words, 
does WindowStore log topic support a special kind of "tombstone" message 
that effectively removes the key from the compacted log?

In that case, my custom processor could keep entries in its WindowStore 
for as log as needed, depending on the activity of a particular input key...

Regards, Peter

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-10 Thread John Roesler
Hi Peter,

Regarding retention, I was not referring to log retention, but to the
window store retention.
Since a new window is created every second (for example), there are in
principle an unbounded
number of windows (the longer the application runs, the more windows there
are, with no end).
However, we obviously can't store an infinite amount of data, so the window
definition includes
a retention period. By default, this is 24 hours. After the retention
period elapses, all of the data
for the window is purged to make room for new windows.

So what I meant was that if you buffer some key "A" in window (Monday
09:00:00) and then get
no further activity for A for over 24 hours, then when you do get that next
event for A, say at
(Tuesday 11:00:00), you'd do the scan but find nothing, since your buffered
state would already
have been purged from the store.

The way I avoided this problem for Suppression was to organize the data by
timestamp instead
of by key, so on *every* update I can search for all the keys that are old
enough and emit them.
I also don't use a window store, so I don't have to worry about the
retention time.

To answer your question about the window store's topic, it configures a
retention time the same
length as the store's retention time, (and they keys are the full windowed
key including the window
start time), so it'll have roughly the same size bound as the store itself.

Back to the process of figuring out what might be wrong with Suppression, I
don't suppose you
would be able to file a Jira and upload a repro program? If not, that's ok.
I haven't been able to
reproduce the bug yet, but it seems like it's happening somewhat
consistently for you, so I should
be able to get it to happen eventually.

Thanks, and sorry again for the troubles.

On Tue, Jan 8, 2019 at 6:48 AM Peter Levart  wrote:

> On 1/8/19 12:57 PM, Peter Levart wrote:
> > Hi John,
> >
> > On 1/8/19 12:45 PM, Peter Levart wrote:
> >>> I looked at your custom transfomer, and it looks almost correct to
> >>> me. The
> >>> only flaw seems to be that it only looks
> >>> for closed windows for the key currently being processed, which
> >>> means that
> >>> if you have key "A" buffered, but don't get another event for it for a
> >>> while after the window closes, you won't emit the final result. This
> >>> might
> >>> actually take longer than the window retention period, in which
> >>> case, the
> >>> data would be deleted without ever emitting the final result.
> >>
> >> So in DSL case, the suppression works by flushing *all* of the "ripe"
> >> windows in the whole buffer whenever a singe event comes in with
> >> recent enough timestamp regardless of the key of that event?
> >>
> >> Is the buffer shared among processing tasks or does each task
> >> maintain its own private buffer that only contains its share of data
> >> pertaining to assigned input partitions? In case the tasks are
> >> executed on several processing JVM(s) the buffer can't really be
> >> shared, right? In that case a single event can't flush all of the
> >> "ripe" windows, but just those that are contained in the task's part
> >> of buffer...
> >
> > Just a question about your comment above:
> >
> > /"This might actually take longer than the window retention period, in
> > which case, the data would be deleted without ever emitting the final
> > result"/
> >
> > Are you talking about the buffer log topic retention? Aren't log
> > topics configured to "compact" rather than "delete" messages? So the
> > last "version" of the buffer entry for a particular key should stay
> > forever? What are the keys in suppression buffer log topic? Are they a
> > pair of (timestamp, key) ? Probably not since in that case the
> > compacted log would grow indefinitely...
> >
> > Another question:
> >
> > What are the keys in WindowStore's log topic? If the input keys to the
> > processor that uses such WindowStore consist of a bounded set of
> > values (for example user ids), would compacted log of such WindowStore
> > also be bounded?
> In case the key of WindowStore log topic is (timestamp, key) then would
> explicitly deleting flushed entries from WindowStore (by putting null
> value into the store) keep the compacted log bounded? In other words,
> does WindowStore log topic support a special kind of "tombstone" message
> that effectively removes the key from the compacted log?
> In that case, my custom processor could keep entries in its WindowStore
> for as log as needed, depending on the activity of a particular input
> key...
> >
> > Regards, Peter
> >
> >

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-14 Thread John Roesler
Hi Peter,

I see your train of thought, but the actual implementation of the
window store is structured differently from your mental model.
Unlike Key/Value stores, we know that the records in a window
store will "expire" on a regular schedule, and also that every single
record will eventually expire. With this in mind, we have implemented
an optimization to avoid a lot of compaction overhead in RocksDB, as
well as saving on range scans.

Instead of storing everything in one database, we open several
databases and bucket windows into them. Then, when windows
expire, we just ignore the records (i.e., the API makes them unreachable,
but we don't actually delete them). Once all the windows in a database
are expired, we just close and delete the whole database. Then, we open
a new one for new windows. If you look in the code, these databases are
called "segments".

Thus, I don't think that you should attempt to use the built-in window
as you described. Instead, it should be straightforward to implement your
own StateStore with a layout that's more favorable to your desired behavior.

You should also be able to set up the change log the way you need as well.
Explicitly removed entities also would get removed from the log as well, if
it's a compacted log.

Actually, what you're describing is *very* similar to the implementation
for suppress. I might actually suggest that you just copy the suppression
implementation and adapt it to your needs, or at the very least, study
how it works. In doing so, you might actually discover the cause of the
bug yourself!

I hope this helps, and thanks for your help,

On Sat, Jan 12, 2019 at 5:45 AM Peter Levart  wrote:

> Hi Jonh,
> Thank you very much for explaining how WindowStore works. I have some
> more questions...
> On 1/10/19 5:33 PM, John Roesler wrote:
> > Hi Peter,
> >
> > Regarding retention, I was not referring to log retention, but to the
> > window store retention.
> > Since a new window is created every second (for example), there are in
> > principle an unbounded
> > number of windows (the longer the application runs, the more windows
> there
> > are, with no end).
> > However, we obviously can't store an infinite amount of data, so the
> window
> > definition includes
> > a retention period. By default, this is 24 hours. After the retention
> > period elapses, all of the data
> > for the window is purged to make room for new windows.
> Right. Would the following work for example:
> - configure retention of WindowStore to be "infinite"
> - explicitly remove records from the store when windows are flushed out
> - configure WindowStore log topic for compacting
> Something like the following:
>  Stores
>  .windowStoreBuilder(
>  Stores.persistentWindowStore(
>  storeName,
>  Duration.of(1000L, ChronoUnit.YEARS), //
> retentionPeriod
>  Duration.ofSeconds(10), // windowSize
>  false
>  ),
>  keySerde, valSerde
>  )
>  .withCachingEnabled()
>  .withLoggingEnabled(
>  Map.of(
>  )
>  );
> Would in above scenario:
> - the on-disk WindowStore be kept bounded (there could be some very old
> entries in it but majority will be new - depending on the activity of
> particular input keys)
> - the log topic be kept bounded (explicitly removed entries would be
> removed from compacted log too)
> I'm moving away from DSL partly because I have some problems with
> suppression (which I hope we'll be able to fix) and partly because the
> DSL can't give me the complicated semantics that I need for the
> application at hand. I tried to capture what I need in a custom
> Transformer here:
> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
> Your knowledge of how WindowStore works would greatly help me decide if
> this is a workable idea.
> >
> > So what I meant was that if you buffer some key "A" in window (Monday
> > 09:00:00) and then get
> > no further activity for A for over 24 hours, then when you do get that
> next
> > event for A, say at
> > (Tuesday 11:00:00), you'd do the scan but find nothing, since your
> buffered
> > state would already
> > have been purged from the store.
> Right. That would be the case when WindowStore was configured with
> default retention of 24 hours. A quick question: What does window size
> configuration for WindowStore (see above) do? Does it have to be
> synchronized with the size of windows stored in it?
> >
> > The way I avoided this problem for Suppression was to organize the data
> by
> > timestamp instead
> > of by key, so on *every* update I can search for all the keys that are
> old
> > enough and emit them.
> > I also don't use a window store, so I don't have to worr

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-22 Thread John Roesler
Hi Peter,

Just to follow up on the actual bug, can you confirm whether:
* when you say "restart", do you mean orderly shutdown and restart, or
crash and restart?
* have you tried this with EOS enabled? I can imagine some ways that there
could be duplicates, but they should be impossible with EOS enabled.

Thanks for your help,

On Mon, Jan 14, 2019 at 1:20 PM John Roesler  wrote:

> Hi Peter,
> I see your train of thought, but the actual implementation of the
> window store is structured differently from your mental model.
> Unlike Key/Value stores, we know that the records in a window
> store will "expire" on a regular schedule, and also that every single
> record will eventually expire. With this in mind, we have implemented
> an optimization to avoid a lot of compaction overhead in RocksDB, as
> well as saving on range scans.
> Instead of storing everything in one database, we open several
> databases and bucket windows into them. Then, when windows
> expire, we just ignore the records (i.e., the API makes them unreachable,
> but we don't actually delete them). Once all the windows in a database
> are expired, we just close and delete the whole database. Then, we open
> a new one for new windows. If you look in the code, these databases are
> called "segments".
> Thus, I don't think that you should attempt to use the built-in window
> stores
> as you described. Instead, it should be straightforward to implement your
> own StateStore with a layout that's more favorable to your desired
> behavior.
> You should also be able to set up the change log the way you need as well.
> Explicitly removed entities also would get removed from the log as well, if
> it's a compacted log.
> Actually, what you're describing is *very* similar to the implementation
> for suppress. I might actually suggest that you just copy the suppression
> implementation and adapt it to your needs, or at the very least, study
> how it works. In doing so, you might actually discover the cause of the
> bug yourself!
> I hope this helps, and thanks for your help,
> -John
> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart 
> wrote:
>> Hi Jonh,
>> Thank you very much for explaining how WindowStore works. I have some
>> more questions...
>> On 1/10/19 5:33 PM, John Roesler wrote:
>> > Hi Peter,
>> >
>> > Regarding retention, I was not referring to log retention, but to the
>> > window store retention.
>> > Since a new window is created every second (for example), there are in
>> > principle an unbounded
>> > number of windows (the longer the application runs, the more windows
>> there
>> > are, with no end).
>> > However, we obviously can't store an infinite amount of data, so the
>> window
>> > definition includes
>> > a retention period. By default, this is 24 hours. After the retention
>> > period elapses, all of the data
>> > for the window is purged to make room for new windows.
>> Right. Would the following work for example:
>> - configure retention of WindowStore to be "infinite"
>> - explicitly remove records from the store when windows are flushed out
>> - configure WindowStore log topic for compacting
>> Something like the following:
>>  Stores
>>  .windowStoreBuilder(
>>  Stores.persistentWindowStore(
>>  storeName,
>>  Duration.of(1000L, ChronoUnit.YEARS), //
>> retentionPeriod
>>  Duration.ofSeconds(10), // windowSize
>>  false
>>  ),
>>  keySerde, valSerde
>>  )
>>  .withCachingEnabled()
>>  .withLoggingEnabled(
>>  Map.of(
>>  )
>>  );
>> Would in above scenario:
>> - the on-disk WindowStore be kept bounded (there could be some very old
>> entries in it but majority will be new - depending on the activity of
>> particular input keys)
>> - the log topic be kept bounded (explicitly removed entries would be
>> removed from compacted log too)
>> I'm moving away from DSL partly because I have some problems with
>> suppression (which I hope we'll be able to fix) and partly because the
>> DSL can't give me the complicated semantics that I need for the
>> application at hand. I tried to capture what I need in a custom
>> Transformer here:
>> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
>> Your knowledge of how WindowStore works would greatly help me decide if
>> this is a workable idea.
>> >
>> > So what I meant was that if you buffer some key "A" in window (Monday
>> > 09:00:00) and then get
>> > no further activity for A for over 24 hours, then when you do get that
>> next
>> > event for A, say at
>> > (Tuesday 11:00:00), you'd do the scan but find nothing, since your
>> buffered
>> > state would already
>> > have been purged from the store.

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-23 Thread Peter Levart

Hi John,

Sorry I haven't had time to prepare the minimal reproducer yet. I still 
have plans to do it though...

On 1/22/19 8:02 PM, John Roesler wrote:

Hi Peter,

Just to follow up on the actual bug, can you confirm whether:
* when you say "restart", do you mean orderly shutdown and restart, or
crash and restart?

I start it as SpringBoot application from IDEA and then stop it with the 
red square button. It does initiate the shutdown sequence before 
exiting... So I think it is by SIGTERM which initiates JVM shutdown hook(s).

* have you tried this with EOS enabled? I can imagine some ways that there
could be duplicates, but they should be impossible with EOS enabled.

Yes, I have EOS enabled.

Thanks for your help,

Regards, Peter

On Mon, Jan 14, 2019 at 1:20 PM John Roesler  wrote:

Hi Peter,

I see your train of thought, but the actual implementation of the
window store is structured differently from your mental model.
Unlike Key/Value stores, we know that the records in a window
store will "expire" on a regular schedule, and also that every single
record will eventually expire. With this in mind, we have implemented
an optimization to avoid a lot of compaction overhead in RocksDB, as
well as saving on range scans.

Instead of storing everything in one database, we open several
databases and bucket windows into them. Then, when windows
expire, we just ignore the records (i.e., the API makes them unreachable,
but we don't actually delete them). Once all the windows in a database
are expired, we just close and delete the whole database. Then, we open
a new one for new windows. If you look in the code, these databases are
called "segments".

Thus, I don't think that you should attempt to use the built-in window
as you described. Instead, it should be straightforward to implement your
own StateStore with a layout that's more favorable to your desired

You should also be able to set up the change log the way you need as well.
Explicitly removed entities also would get removed from the log as well, if
it's a compacted log.

Actually, what you're describing is *very* similar to the implementation
for suppress. I might actually suggest that you just copy the suppression
implementation and adapt it to your needs, or at the very least, study
how it works. In doing so, you might actually discover the cause of the
bug yourself!

I hope this helps, and thanks for your help,

On Sat, Jan 12, 2019 at 5:45 AM Peter Levart 

Hi Jonh,

Thank you very much for explaining how WindowStore works. I have some
more questions...

On 1/10/19 5:33 PM, John Roesler wrote:

Hi Peter,

Regarding retention, I was not referring to log retention, but to the
window store retention.
Since a new window is created every second (for example), there are in
principle an unbounded
number of windows (the longer the application runs, the more windows


are, with no end).
However, we obviously can't store an infinite amount of data, so the


definition includes
a retention period. By default, this is 24 hours. After the retention
period elapses, all of the data
for the window is purged to make room for new windows.

Right. Would the following work for example:

- configure retention of WindowStore to be "infinite"
- explicitly remove records from the store when windows are flushed out
- configure WindowStore log topic for compacting

Something like the following:

  Duration.of(1000L, ChronoUnit.YEARS), //
  Duration.ofSeconds(10), // windowSize
  keySerde, valSerde

Would in above scenario:

- the on-disk WindowStore be kept bounded (there could be some very old
entries in it but majority will be new - depending on the activity of
particular input keys)
- the log topic be kept bounded (explicitly removed entries would be
removed from compacted log too)

I'm moving away from DSL partly because I have some problems with
suppression (which I hope we'll be able to fix) and partly because the
DSL can't give me the complicated semantics that I need for the
application at hand. I tried to capture what I need in a custom
Transformer here:


Your knowledge of how WindowStore works would greatly help me decide if
this is a workable idea.

So what I meant was that if you buffer some key "A" in window (Monday
09:00:00) and then get
no further activity for A for over 24 hours, then when you do get that


event for A, say at
(Tuesday 11

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-24 Thread John Roesler
Hi Peter,

Thanks for the clarification.

When you hit the "stop" button, AFAIK it does send a SIGTERM, but I don't
think that Streams automatically registers a shutdown hook. In our examples
and demos, we register a shutdown hook "outside" of streams (right next to
the code that calls start() ).
Unless I missed something, a SIGTERM would still cause Streams to exit
abruptly, skipping flush and commit. This can cause apparent duplicates *if
you're not using EOS or if you're reading uncommitted transactions*.

The reason is that, upon restart, the suppression buffer can only
"remember" what got sent & committed to its changelog topic before.

The scenario I have in mind is:

* buffer state X
* flush state X to buffer changelog
* commit transaction T0; start new transaction T1
* emit final result X (in uncommitted transaction T1)
* crash before flushing to the changelog the fact that state X was emitted.
Also, transaction T1 gets aborted, since we crash before committing.
* restart, restoring state X again from the changelog (because the emit
didn't get committed)
* start transaction T2
* emit final result X again (in uncommitted transaction T2)
* commit transaction T2

So, the result gets emitted twice, but the first time is in an aborted
transaction. This leads me to another clarifying question:

Based on your first message, it seems like the duplicates you observe are
in the output topic. When you read the topic, do you configure your
consumer with "read committed" mode? If not, you'll see "results" from
uncommitted transactions, which could explain the duplicates.

Likewise, if you were to attach a callback, like "foreach" downstream of
the suppression, you would see duplicates in the case of a crash. Callbacks
are a general "hole" in EOS, which I have some ideas to close, but that's a
separate topic.

There may still be something else going on, but I'm trying to start with
the simpler explanations.

Thanks again,


On Wed, Jan 23, 2019 at 5:11 AM Peter Levart  wrote:

> Hi John,
> Sorry I haven't had time to prepare the minimal reproducer yet. I still
> have plans to do it though...
> On 1/22/19 8:02 PM, John Roesler wrote:
> > Hi Peter,
> >
> > Just to follow up on the actual bug, can you confirm whether:
> > * when you say "restart", do you mean orderly shutdown and restart, or
> > crash and restart?
> I start it as SpringBoot application from IDEA and then stop it with the
> red square button. It does initiate the shutdown sequence before
> exiting... So I think it is by SIGTERM which initiates JVM shutdown
> hook(s).
> > * have you tried this with EOS enabled? I can imagine some ways that
> there
> > could be duplicates, but they should be impossible with EOS enabled.
> Yes, I have EOS enabled.
> >
> > Thanks for your help,
> > -John
> Regards, Peter
> >
> > On Mon, Jan 14, 2019 at 1:20 PM John Roesler  wrote:
> >
> >> Hi Peter,
> >>
> >> I see your train of thought, but the actual implementation of the
> >> window store is structured differently from your mental model.
> >> Unlike Key/Value stores, we know that the records in a window
> >> store will "expire" on a regular schedule, and also that every single
> >> record will eventually expire. With this in mind, we have implemented
> >> an optimization to avoid a lot of compaction overhead in RocksDB, as
> >> well as saving on range scans.
> >>
> >> Instead of storing everything in one database, we open several
> >> databases and bucket windows into them. Then, when windows
> >> expire, we just ignore the records (i.e., the API makes them
> unreachable,
> >> but we don't actually delete them). Once all the windows in a database
> >> are expired, we just close and delete the whole database. Then, we open
> >> a new one for new windows. If you look in the code, these databases are
> >> called "segments".
> >>
> >> Thus, I don't think that you should attempt to use the built-in window
> >> stores
> >> as you described. Instead, it should be straightforward to implement
> your
> >> own StateStore with a layout that's more favorable to your desired
> >> behavior.
> >>
> >> You should also be able to set up the change log the way you need as
> well.
> >> Explicitly removed entities also would get removed from the log as
> well, if
> >> it's a compacted log.
> >>
> >> Actually, what you're describing is *very* similar to the implementation
> >> for suppress. I might actually suggest that you just copy the
> suppression
> >> implementation and adapt it to your needs, or at the very least, study
> >> how it works. In doing so, you might actually discover the cause of the
> >> bug yourself!
> >>
> >> I hope this helps, and thanks for your help,
> >> -John
> >>
> >>
> >> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart 
> >> wrote:
> >>
> >>> Hi Jonh,
> >>>
> >>> Thank you very much for explaining how WindowStore works. I have some
> >>> more questions...
> >>>
> >>> On 1/10/19 5:33 PM, John Roes

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-24 Thread Peter Levart

Hi John,

On 1/24/19 3:18 PM, John Roesler wrote:

Hi Peter,

Thanks for the clarification.

When you hit the "stop" button, AFAIK it does send a SIGTERM, but I don't
think that Streams automatically registers a shutdown hook. In our examples
and demos, we register a shutdown hook "outside" of streams (right next to
the code that calls start() ).
Unless I missed something, a SIGTERM would still cause Streams to exit
abruptly, skipping flush and commit. This can cause apparent duplicates *if
you're not using EOS or if you're reading uncommitted transactions*.

The fact is that Spring which I use to instantiate the KafkaStreams 
object does that:

    @Bean(initMethod = "start", destroyMethod = "close")
    public KafkaStreams processorStreams(...

..so when JVM gets SIGTERM, the shutdown hook that Spring installs shuts 
down the ApplicationContext which calls all "destroyMethod"(s) on 
registered Bean(s)...

And the duplicates are less apparent but still occur even in EOS mode... 
But they are not actual duplicates. They are duplicate(s) only by 
windowed keys, the values are different...

The reason is that, upon restart, the suppression buffer can only
"remember" what got sent & committed to its changelog topic before.

The scenario I have in mind is:

* buffer state X
* flush state X to buffer changelog
* commit transaction T0; start new transaction T1
* emit final result X (in uncommitted transaction T1)
* crash before flushing to the changelog the fact that state X was emitted.
Also, transaction T1 gets aborted, since we crash before committing.
* restart, restoring state X again from the changelog (because the emit
didn't get committed)
* start transaction T2
* emit final result X again (in uncommitted transaction T2)
* commit transaction T2

So, the result gets emitted twice, but the first time is in an aborted
transaction. This leads me to another clarifying question:

Based on your first message, it seems like the duplicates you observe are
in the output topic. When you read the topic, do you configure your
consumer with "read committed" mode? If not, you'll see "results" from
uncommitted transactions, which could explain the duplicates.

So when EOS is enabled, the output topics are used in transactional 
manner. The consumer of such topic should enable read_commited semantics 

That would do if my problem was about seeing duplicates of final 
windowing results. That is not my problem. My problem is that upon 
restart of processor, I see some non-final window aggregations, followed 
by final aggregations for the same windowed key. That's harder to 
tolerate in an application. If it was just duplicates of the "correct" 
aggregation I could ignore the 2nd and subsequent message for the same 
windowed key, but if I 1st get a non-final aggregation, I can not simply 
ignore the 2nd occurence of the same windowed key. I must cope with 
"replacing the previous aggregation with new version of it" in the app. 
Meaning, that suppression of non-final results does not buy me anything 
as it is not guaranteeing that.

Is it possible that non-final windowed aggregations are emitted in some 
scenario, but then such transaction is rolled-back and I would not see 
the non-fnal aggregations if I enabled read commited isolation on consumer?

I think I'll have to reinstate the demo and try that...

Stay tuned.

Regards, Peter

Likewise, if you were to attach a callback, like "foreach" downstream of
the suppression, you would see duplicates in the case of a crash. Callbacks
are a general "hole" in EOS, which I have some ideas to close, but that's a
separate topic.

There may still be something else going on, but I'm trying to start with
the simpler explanations.

Thanks again,


On Wed, Jan 23, 2019 at 5:11 AM Peter Levart  wrote:

Hi John,

Sorry I haven't had time to prepare the minimal reproducer yet. I still
have plans to do it though...

On 1/22/19 8:02 PM, John Roesler wrote:

Hi Peter,

Just to follow up on the actual bug, can you confirm whether:
* when you say "restart", do you mean orderly shutdown and restart, or
crash and restart?

I start it as SpringBoot application from IDEA and then stop it with the
red square button. It does initiate the shutdown sequence before
exiting... So I think it is by SIGTERM which initiates JVM shutdown

* have you tried this with EOS enabled? I can imagine some ways that


could be duplicates, but they should be impossible with EOS enabled.

Yes, I have EOS enabled.

Thanks for your help,

Regards, Peter

On Mon, Jan 14, 2019 at 1:20 PM John Roesler  wrote:

Hi Peter,

I see your train of thought, but the actual implementation of the
window store is structured differently from your mental model.
Unlike Key/Value stores, we know that the records in a window
store will "expire" on a regular schedule, and also that every single
record will eventually expire. With this in mind, we 

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-24 Thread Peter Levart

Hi John,

Haven't been able to reinstate the demo yet, but I have been re-reading 
the following scenario of yours

On 1/24/19 11:48 PM, Peter Levart wrote:

Hi John,

On 1/24/19 3:18 PM, John Roesler wrote:

The reason is that, upon restart, the suppression buffer can only
"remember" what got sent & committed to its changelog topic before.

The scenario I have in mind is:

* buffer state X
* flush state X to buffer changelog
* commit transaction T0; start new transaction T1
* emit final result X (in uncommitted transaction T1)
* crash before flushing to the changelog the fact that state X was 

Also, transaction T1 gets aborted, since we crash before committing.
* restart, restoring state X again from the changelog (because the emit
didn't get committed)
* start transaction T2
* emit final result X again (in uncommitted transaction T2)
* commit transaction T2

So, the result gets emitted twice, but the first time is in an aborted
transaction. This leads me to another clarifying question:

Based on your first message, it seems like the duplicates you observe 

in the output topic. When you read the topic, do you configure your
consumer with "read committed" mode? If not, you'll see "results" from
uncommitted transactions, which could explain the duplicates.

...and I was thinking that perhaps the right solution to the suppression 
problem would be to use transactional producers for the resulting output 
topic AND the store change-log. Is this possible? Does the compaction of 
the log on the brokers work for transactional producers as expected? In 
that case, the sending of final result and the marking of that fact in 
the store change log would together be an atomic operation.
That said, I think there's another problem with suppression which looks 
like the supression processor is already processing the input while the 
state store has not been fully restored yet or something related... Is 
this guaranteed not to happen?

And now something unrelated I wanted to ask...

I'm trying to create my own custom state store. From the API I can see 
it is pretty straightforward. One thing that I don't quite understand is 
how Kafka Streams know whether to replay the whole change log after the 
store registers itself or just a part of it and which part (from which 
offset per partition). There doesn't seem to be any API point through 
which the store could communicate this information back to Kafka 
Streams. Is such bookkeeping performed outside the store? Does Kafka 
Streams first invoke flush() on the store and then notes down the 
offsets from the change log producer somewhere? So next time the store 
is brought up, the log is only replayed from last noted down offset? So 
it can happen that the store gets some log entries that have already 
been incorporated in it (from the point of one flush before) but never 
misses any... In any case there has to be an indication somewhere that 
the store didn't survive and has to be rebuilt from scratch. How do 
Kafka Streams detect that situation? By placing some marker file into 
the directory reserved for store's local storage?

Regards, Peter

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-25 Thread John Roesler
Hi Peter,

Thanks for the replies.

Regarding transactions:
Yes, actually, with EOS enabled, the changelog and the output topics are
all produced with the same transactional producer, within the same
transactions. So it should already be atomic.

Regarding restore:
Streams doesn't put the store into service until the restore is completed,
so it should be guaranteed not to happen. But there's of course no
guarantee that I didn't mess something up. I'll take a hard look at it.

Regarding restoration and offsets:
Your guess is correct: Streams tracks the latest stored offset outside of
the store implementation itself, specifically by writing a file (called a
Checkpoint File) in the state directory. If the file is there, it reads
that offset and restores from that point. If the file is missing, it
restores from the beginning of the stream. So it should "just work" for
you. Just for completeness, there have been several edge cases discovered
where this mechanism isn't completely safe, so in the case of EOS, I
believe we actually disregard that checkpoint file and the prior state and
always rebuild from the earliest offset in the changelog.

Personally, I would like to see us provide the ability to store the
checkpoint inside the state store, so that checkpoint updates are
linearized correctly w.r.t. data updates, but I actually haven't mentioned
this thought to anyone until now ;)

Finally, regarding your prior email:
Yes, I was thinking that the "wrong" output values might be part of
rolled-back transactions and therefore enabling read-committed mode on the
consumer might tell a different story that what you've seen to date.

I'm honestly still baffled about those intermediate results that are
sneaking out. I wonder if it's something specific to your data stream, like
maybe if there is maybe an edge case when two records have exactly the same
timestamp? I'll have to stare at the code some more...

Regardless, in order to reap the benefits of running the app with EOS, you
really have to also set your consumers to read_committed. Otherwise, you'll
be seeing output data from aborted (aka rolled-back) transactions, and you
miss the intended "exactly once" guarantee.


On Fri, Jan 25, 2019 at 1:51 AM Peter Levart  wrote:

> Hi John,
> Haven't been able to reinstate the demo yet, but I have been re-reading
> the following scenario of yours
> On 1/24/19 11:48 PM, Peter Levart wrote:
> > Hi John,
> >
> > On 1/24/19 3:18 PM, John Roesler wrote:
> >
> >>
> >> The reason is that, upon restart, the suppression buffer can only
> >> "remember" what got sent & committed to its changelog topic before.
> >>
> >> The scenario I have in mind is:
> >>
> >> ...
> >> * buffer state X
> >> ...
> >> * flush state X to buffer changelog
> >> ...
> >> * commit transaction T0; start new transaction T1
> >> ...
> >> * emit final result X (in uncommitted transaction T1)
> >> ...
> >> * crash before flushing to the changelog the fact that state X was
> >> emitted.
> >> Also, transaction T1 gets aborted, since we crash before committing.
> >> ...
> >> * restart, restoring state X again from the changelog (because the emit
> >> didn't get committed)
> >> * start transaction T2
> >> * emit final result X again (in uncommitted transaction T2)
> >> ...
> >> * commit transaction T2
> >> ...
> >>
> >> So, the result gets emitted twice, but the first time is in an aborted
> >> transaction. This leads me to another clarifying question:
> >>
> >> Based on your first message, it seems like the duplicates you observe
> >> are
> >> in the output topic. When you read the topic, do you configure your
> >> consumer with "read committed" mode? If not, you'll see "results" from
> >> uncommitted transactions, which could explain the duplicates.
> ...and I was thinking that perhaps the right solution to the suppression
> problem would be to use transactional producers for the resulting output
> topic AND the store change-log. Is this possible? Does the compaction of
> the log on the brokers work for transactional producers as expected? In
> that case, the sending of final result and the marking of that fact in
> the store change log would together be an atomic operation.
> That said, I think there's another problem with suppression which looks
> like the supression processor is already processing the input while the
> state store has not been fully restored yet or something related... Is
> this guaranteed not to happen?
> And now something unrelated I wanted to ask...
> I'm trying to create my own custom state store. From the API I can see
> it is pretty straightforward. One thing that I don't quite understand is
> how Kafka Streams know whether to replay the whole change log after the
> store registers itself or just a part of it and which part (from which
> offset per partition). There doesn't seem to be any API point through
> which the store could communicate this information back to Kafka
> Streams. Is such bookkeeping performe

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-02-26 Thread John Roesler
Hi again, Peter,

Just to close the loop about the bug in Suppress, we did get the (apparent)
same report from a few other people:

I also managed to reproduce the duplicate-result behavior, which could
cause it to emit both intermediate results and duplicate final results.

There's a patch for it in the 2.2 release candidate. Perhaps you can try it
out and see if it resolves the issue for you?

I'm backporting the fix to 2.1 as well, but I unfortunately missed the last
2.1 bugfix release.


On Fri, Jan 25, 2019 at 10:23 AM John Roesler  wrote:

> Hi Peter,
> Thanks for the replies.
> Regarding transactions:
> Yes, actually, with EOS enabled, the changelog and the output topics are
> all produced with the same transactional producer, within the same
> transactions. So it should already be atomic.
> Regarding restore:
> Streams doesn't put the store into service until the restore is completed,
> so it should be guaranteed not to happen. But there's of course no
> guarantee that I didn't mess something up. I'll take a hard look at it.
> Regarding restoration and offsets:
> Your guess is correct: Streams tracks the latest stored offset outside of
> the store implementation itself, specifically by writing a file (called a
> Checkpoint File) in the state directory. If the file is there, it reads
> that offset and restores from that point. If the file is missing, it
> restores from the beginning of the stream. So it should "just work" for
> you. Just for completeness, there have been several edge cases discovered
> where this mechanism isn't completely safe, so in the case of EOS, I
> believe we actually disregard that checkpoint file and the prior state and
> always rebuild from the earliest offset in the changelog.
> Personally, I would like to see us provide the ability to store the
> checkpoint inside the state store, so that checkpoint updates are
> linearized correctly w.r.t. data updates, but I actually haven't mentioned
> this thought to anyone until now ;)
> Finally, regarding your prior email:
> Yes, I was thinking that the "wrong" output values might be part of
> rolled-back transactions and therefore enabling read-committed mode on the
> consumer might tell a different story that what you've seen to date.
> I'm honestly still baffled about those intermediate results that are
> sneaking out. I wonder if it's something specific to your data stream, like
> maybe if there is maybe an edge case when two records have exactly the same
> timestamp? I'll have to stare at the code some more...
> Regardless, in order to reap the benefits of running the app with EOS, you
> really have to also set your consumers to read_committed. Otherwise, you'll
> be seeing output data from aborted (aka rolled-back) transactions, and you
> miss the intended "exactly once" guarantee.
> Thanks,
> -John
> On Fri, Jan 25, 2019 at 1:51 AM Peter Levart 
> wrote:
>> Hi John,
>> Haven't been able to reinstate the demo yet, but I have been re-reading
>> the following scenario of yours
>> On 1/24/19 11:48 PM, Peter Levart wrote:
>> > Hi John,
>> >
>> > On 1/24/19 3:18 PM, John Roesler wrote:
>> >
>> >>
>> >> The reason is that, upon restart, the suppression buffer can only
>> >> "remember" what got sent & committed to its changelog topic before.
>> >>
>> >> The scenario I have in mind is:
>> >>
>> >> ...
>> >> * buffer state X
>> >> ...
>> >> * flush state X to buffer changelog
>> >> ...
>> >> * commit transaction T0; start new transaction T1
>> >> ...
>> >> * emit final result X (in uncommitted transaction T1)
>> >> ...
>> >> * crash before flushing to the changelog the fact that state X was
>> >> emitted.
>> >> Also, transaction T1 gets aborted, since we crash before committing.
>> >> ...
>> >> * restart, restoring state X again from the changelog (because the emit
>> >> didn't get committed)
>> >> * start transaction T2
>> >> * emit final result X again (in uncommitted transaction T2)
>> >> ...
>> >> * commit transaction T2
>> >> ...
>> >>
>> >> So, the result gets emitted twice, but the first time is in an aborted
>> >> transaction. This leads me to another clarifying question:
>> >>
>> >> Based on your first message, it seems like the duplicates you observe
>> >> are
>> >> in the output topic. When you read the topic, do you configure your
>> >> consumer with "read committed" mode? If not, you'll see "results" from
>> >> uncommitted transactions, which could explain the duplicates.
>> ...and I was thinking that perhaps the right solution to the suppression
>> problem would be to use transactional producers for the resulting output
>> topic AND the store change-log. Is this possible? Does the compaction of
>> the log on the brokers work for transactional producers as expected? In
>> that case, the sending of final result and the marking of that fact in
>> the store change log would together be an atomic operation.
>> That said, I think 

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-03-01 Thread Jonathan Santilli
Hello John, hope you are well.
I have tested the version 2.2 release candidate (although I know it has
been postponed).
I have been following this email thread because I think am experiencing the
same issue. I have reported in an email to this list and also all the
details are in OS (

After the test, the result is the same as before (at least for my case),
already processed records are passed again to the output topic causing the
data duplication:

2019-03-01 16:55:23,808 INFO
internals.StoreChangelogReader (StoreChangelogReader.java:221) -
stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] No
checkpoint found for task 1_10 state store
turned on. *Reinitializing
the task and restore its state from the beginning.*


I was hoping for this to be fixed, but is not the case, at least for my

If you can, please take a look at the question in SO, I was in contact with
Matthias about it, he also points me the place where probably the
potential but could be present.

Please, let me know any thoughts.


On Tue, Feb 26, 2019 at 5:23 PM John Roesler  wrote:

> Hi again, Peter,
> Just to close the loop about the bug in Suppress, we did get the (apparent)
> same report from a few other people:
> https://issues.apache.org/jira/browse/KAFKA-7895
> I also managed to reproduce the duplicate-result behavior, which could
> cause it to emit both intermediate results and duplicate final results.
> There's a patch for it in the 2.2 release candidate. Perhaps you can try it
> out and see if it resolves the issue for you?
> I'm backporting the fix to 2.1 as well, but I unfortunately missed the last
> 2.1 bugfix release.
> Thanks,
> -John
> On Fri, Jan 25, 2019 at 10:23 AM John Roesler  wrote:
> > Hi Peter,
> >
> > Thanks for the replies.
> >
> > Regarding transactions:
> > Yes, actually, with EOS enabled, the changelog and the output topics are
> > all produced with the same transactional producer, within the same
> > transactions. So it should already be atomic.
> >
> > Regarding restore:
> > Streams doesn't put the store into service until the restore is
> completed,
> > so it should be guaranteed not to happen. But there's of course no
> > guarantee that I didn't mess something up. I'll take a hard look at it.
> >
> > Regarding restoration and offsets:
> > Your guess is correct: Streams tracks the latest stored offset outside of
> > the store implementation itself, specifically by writing a file (called a
> > Checkpoint File) in the state directory. If the file is there, it reads
> > that offset and restores from that point. If the file is missing, it
> > restores from the beginning of the stream. So it should "just work" for
> > you. Just for completeness, there have been several edge cases discovered
> > where this mechanism isn't completely safe, so in the case of EOS, I
> > believe we actually disregard that checkpoint file and the prior state
> and
> > always rebuild from the earliest offset in the changelog.
> >
> > Personally, I would like to see us provide the ability to store the
> > checkpoint inside the state store, so that checkpoint updates are
> > linearized correctly w.r.t. data updates, but I actually haven't
> mentioned
> > this thought to anyone until now ;)
> >
> > Finally, regarding your prior email:
> > Yes, I was thinking that the "wrong" output values might be part of
> > rolled-back transactions and therefore enabling read-committed mode on
> the
> > consumer might tell a different story that what you've seen to date.
> >
> > I'm honestly still baffled about those intermediate results that are
> > sneaking out. I wonder if it's something specific to your data stream,
> like
> > maybe if there is maybe an edge case when two records have exactly the
> same
> > timestamp? I'll have to stare at the code some more...
> >
> > Regardless, in order to reap the benefits of running the app with EOS,
> you
> > really have to also set your consumers to read_committed. Otherwise,
> you'll
> > be seeing output data from aborted (aka rolled-back) transactions, and
> you
> > miss the intended "exactly once" guarantee.
> >
> > Thanks,
> > -John
> >
> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart 
> > wrote:
> >
> >> Hi John,
> >>
> >> Haven't been able to reinstate the demo yet, but I have been re-reading
> >> the following scenario of yours
> >>
> >> On 1/24/19 11:48 PM, Peter Levart wrote:
> >> > Hi John,
> >> >
> >> > On 1/24/19 3:18 PM, John Roesler wrote:
> >> >
> >> >>
> >> >> The reason is that, upon restart, the suppression buffer can only
> >> >> "remember" what got sent & committed to its changelog topic before.
> >> >>
> >> >> The scenario 

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-03-01 Thread Jonathan Santilli
BTW, after stopping the app gracefully (Stream#close()), this error shows
up repeatedly:

2019-03-01 17:18:07,819 WARN
internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
[0_0] Failed to write offset checkpoint file to

java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
(No such file or directory)

at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]

at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191]

at java.io.FileOutputStream.(FileOutputStream.java:213) ~[?:1.8.0_191]

at java.io.FileOutputStream.(FileOutputStream.java:162) ~[?:1.8.0_191]

at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]

ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
StreamTask.java:599) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.StreamTask.close(
StreamTask.java:721) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
TaskManager.java:267) [kafka-streams-2.2.0.jar:?]

StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]

at org.apache.kafka.streams.processor.internals.StreamThread.run(
StreamThread.java:786) [kafka-streams-2.2.0.jar:?]

However, I have checked and the folder created starts with: *1_*

ls -lha /tmp/kafka-stream/XXX/1_1
total 8
drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
-rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
-rw-r--r--   1 a  b 0B  1 Mar 16:05 .lock
drwxr-xr-x   3 a  b96B  1 Mar 16:43


On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli 

> Hello John, hope you are well.
> I have tested the version 2.2 release candidate (although I know it has
> been postponed).
> I have been following this email thread because I think am experiencing
> the same issue. I have reported in an email to this list and also all the
> details are in OS (
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> ).
> After the test, the result is the same as before (at least for my case),
> already processed records are passed again to the output topic causing the
> data duplication:
> ...
> 2019-03-01 16:55:23,808 INFO  
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] No
> checkpoint found for task 1_10 state store
> XXX-KTABLE-SUPPRESS-STATE-STORE-11-changelog-10 with EOS turned on. 
> *Reinitializing
> the task and restore its state from the beginning.*
> ...
> I was hoping for this to be fixed, but is not the case, at least for my
> case.
> If you can, please take a look at the question in SO, I was in contact
> with Matthias about it, he also points me the place where probably the
> potential but could be present.
> Please, let me know any thoughts.
> Cheers!
> --
> Jonathan
> On Tue, Feb 26, 2019 at 5:23 PM John Roesler  wrote:
>> Hi again, Peter,
>> Just to close the loop about the bug in Suppress, we did get the
>> (apparent)
>> same report from a few other people:
>> https://issues.apache.org/jira/browse/KAFKA-7895
>> I also managed to reproduce the duplicate-result behavior, which could
>> cause it to emit both intermediate results and duplicate final results.
>> There's a patch for it in the 2.2 release candidate. Perhaps you can try
>> it
>> out and see if it resolves the issue for you?
>> I'm backporting the fix to 2.1 as well, but I unfortunately missed the
>> last
>> 2.1 bugfix release.
>> Thanks,
>> -John
>> On Fri, Jan 25, 2019 at 10:23 AM John Roesler  wrote:
>> > Hi Peter,
>> >
>> > Thanks for the replies.
>> >
>> > Regarding transactions:
>> > Yes, actually, with EOS enabled, the changelog and the output topics are
>> > all produced with the same transactional producer, within the same
>> > transactions. So it should already be atomic.
>> >
>> > Regarding restore:
>> > Streams doesn't put the store into service until the restore is
>> completed,
>> > so it should be guaranteed not to happen. But there's of course no
>> > guarantee that I didn't mess something up. I'll take a hard look at it.
>> >
>> > Regarding restoration and offsets:
>> > Your guess is correct: Streams tracks the latest store

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-03-04 Thread John Roesler
Hi Jonathan,

Sorry to hear that the feature is causing you trouble as well, and that the
2.2 release candidate didn't seem to fix it.

I'll try and do a repro based on the code in your SO post tomorrow.

I don't think it's related to the duplicates, but that shutdown error is
puzzling. Can you print the topology (with topology.describe() ) ? This
will tell us what is in task 1 (i.e., *1_*) of your program.


On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli <
jonathansanti...@gmail.com> wrote:

> BTW, after stopping the app gracefully (Stream#close()), this error shows
> up repeatedly:
> 2019-03-01 17:18:07,819 WARN
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
> [0_0] Failed to write offset checkpoint file to
> [/tmp/kafka-stream/XXX/0_0/.checkpoint]
> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
> (No such file or directory)
> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
> at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191]
> at java.io.FileOutputStream.(FileOutputStream.java:213)
> ~[?:1.8.0_191]
> at java.io.FileOutputStream.(FileOutputStream.java:162)
> ~[?:1.8.0_191]
> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
> StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.close(
> StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
> TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.run(
> StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
> However, I have checked and the folder created starts with: *1_*
> ls -lha /tmp/kafka-stream/XXX/1_1
> total 8
> drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
> drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
> -rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
> -rw-r--r--   1 a  b 0B  1 Mar 16:05 .lock
> drwxr-xr-x   3 a  b96B  1 Mar 16:43
> Cheers!
> --
> Jonathan
> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <
> jonathansanti...@gmail.com>
> wrote:
> > Hello John, hope you are well.
> > I have tested the version 2.2 release candidate (although I know it has
> > been postponed).
> > I have been following this email thread because I think am experiencing
> > the same issue. I have reported in an email to this list and also all the
> > details are in OS (
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> > ).
> >
> > After the test, the result is the same as before (at least for my case),
> > already processed records are passed again to the output topic causing
> the
> > data duplication:
> >
> > ...
> > 2019-03-01 16:55:23,808 INFO
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> > internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> > stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> No
> > checkpoint found for task 1_10 state store
> > XXX-KTABLE-SUPPRESS-STATE-STORE-11-changelog-10 with EOS turned
> on. *Reinitializing
> > the task and restore its state from the beginning.*
> >
> > ...
> >
> >
> > I was hoping for this to be fixed, but is not the case, at least for my
> > case.
> >
> > If you can, please take a look at the question in SO, I was in contact
> > with Matthias about it, he also points me the place where probably the
> > potential but could be present.
> >
> > Please, let me know any thoughts.
> >
> >
> > Cheers!
> > --
> > Jonathan
> >
> >
> > On Tue, Feb 26, 2019 at 5:23 PM John Roesler  wrote:
> >
> >> Hi again, Peter,
> >>
> >> Just to close the loop about the bug in Suppress, we did get the
> >> (apparent)
> >> same report from a few other people:
> >> https://issues.apache.org/jira/browse/KAFKA-7895
> >>
> >> I also managed to reproduce the duplicate-result behavior, which could
> >> cause it to emit both intermediate results and duplicate final results.
> >>
> >> There's a patch for it in the 2.2 release candidate. Perhaps you can try
> >> it
> >> out and see if it resolves the issue for you?
> >>
> >> I'm backporting the fix to 2.1 as well, but I unfortun

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-03-05 Thread John Roesler
Hi Jonathan,

Just a quick update: I have not been able to reproduce the duplicates issue
with the 2.2 RC, even with a topology very similar to the one you included
in your stackoverflow post.

I think we should treat this as a new bug. Would you mind opening a new
Jira bug ticket with some steps to reproduce the problem, and also exactly
the behavior you observe?


On Mon, Mar 4, 2019 at 10:41 PM John Roesler  wrote:

> Hi Jonathan,
> Sorry to hear that the feature is causing you trouble as well, and that
> the 2.2 release candidate didn't seem to fix it.
> I'll try and do a repro based on the code in your SO post tomorrow.
> I don't think it's related to the duplicates, but that shutdown error is
> puzzling. Can you print the topology (with topology.describe() ) ? This
> will tell us what is in task 1 (i.e., *1_*) of your program.
> Thanks,
> -John
> On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli <
> jonathansanti...@gmail.com> wrote:
>> BTW, after stopping the app gracefully (Stream#close()), this error shows
>> up repeatedly:
>> 2019-03-01 17:18:07,819 WARN
>> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
>> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
>> [0_0] Failed to write offset checkpoint file to
>> [/tmp/kafka-stream/XXX/0_0/.checkpoint]
>> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
>> (No such file or directory)
>> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
>> at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_191]
>> at java.io.FileOutputStream.(FileOutputStream.java:213)
>> ~[?:1.8.0_191]
>> at java.io.FileOutputStream.(FileOutputStream.java:162)
>> ~[?:1.8.0_191]
>> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
>> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
>> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
>> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
>> StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
>> at org.apache.kafka.streams.processor.internals.StreamTask.close(
>> StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
>> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
>> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
>> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
>> TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
>> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
>> at org.apache.kafka.streams.processor.internals.StreamThread.run(
>> StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
>> However, I have checked and the folder created starts with: *1_*
>> ls -lha /tmp/kafka-stream/XXX/1_1
>> total 8
>> drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
>> drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
>> -rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
>> -rw-r--r--   1 a  b 0B  1 Mar 16:05 .lock
>> drwxr-xr-x   3 a  b96B  1 Mar 16:43
>> Cheers!
>> --
>> Jonathan
>> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <
>> jonathansanti...@gmail.com>
>> wrote:
>> > Hello John, hope you are well.
>> > I have tested the version 2.2 release candidate (although I know it has
>> > been postponed).
>> > I have been following this email thread because I think am experiencing
>> > the same issue. I have reported in an email to this list and also all
>> the
>> > details are in OS (
>> >
>> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
>> > ).
>> >
>> > After the test, the result is the same as before (at least for my case),
>> > already processed records are passed again to the output topic causing
>> the
>> > data duplication:
>> >
>> > ...
>> > 2019-03-01 16:55:23,808 INFO
>> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
>> > internals.StoreChangelogReader (StoreChangelogReader.java:221) -
>> > stream-thread [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
>> No
>> > checkpoint found for task 1_10 state store
>> > XXX-KTABLE-SUPPRESS-STATE-STORE-11-changelog-10 with EOS turned
>> on. *Reinitializing
>> > the task and restore its state from the beginning.*
>> >
>> > ...
>> >
>> >
>> > I was hoping for this to be fixed, but is not the case, at least for my
>> > case.
>> >
>> > If you can, please take a look at the question in SO, I was in contact
>> > with Matthias about it, he also points me the place where probably the
>> > potential but could be present.
>> >
>> > Please, let me know any thoughts.
>> >
>> >
>> > Cheers!
>> > --
>> > Jonathan
>> >
>> >
>> > On Tue, Feb 26, 2019 at 5:23 PM John Roe

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-03-08 Thread Jonathan Santilli
Sure John, I will document it.

Thanks a lot for your reply.

On Tue, Mar 5, 2019 at 7:38 PM John Roesler  wrote:

> Hi Jonathan,
> Just a quick update: I have not been able to reproduce the duplicates issue
> with the 2.2 RC, even with a topology very similar to the one you included
> in your stackoverflow post.
> I think we should treat this as a new bug. Would you mind opening a new
> Jira bug ticket with some steps to reproduce the problem, and also exactly
> the behavior you observe?
> Thanks,
> -John
> On Mon, Mar 4, 2019 at 10:41 PM John Roesler  wrote:
> > Hi Jonathan,
> >
> > Sorry to hear that the feature is causing you trouble as well, and that
> > the 2.2 release candidate didn't seem to fix it.
> >
> > I'll try and do a repro based on the code in your SO post tomorrow.
> >
> > I don't think it's related to the duplicates, but that shutdown error is
> > puzzling. Can you print the topology (with topology.describe() ) ? This
> > will tell us what is in task 1 (i.e., *1_*) of your program.
> >
> > Thanks,
> > -John
> >
> > On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli <
> > jonathansanti...@gmail.com> wrote:
> >
> >> BTW, after stopping the app gracefully (Stream#close()), this error
> shows
> >> up repeatedly:
> >>
> >> 2019-03-01 17:18:07,819 WARN
> >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
> >> [0_0] Failed to write offset checkpoint file to
> >> [/tmp/kafka-stream/XXX/0_0/.checkpoint]
> >>
> >> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
> >> (No such file or directory)
> >>
> >> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.(FileOutputStream.java:213)
> >> ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.(FileOutputStream.java:162)
> >> ~[?:1.8.0_191]
> >>
> >> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> >> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
> >>
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
> >> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
> >> StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamTask.close(
> >> StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
> >> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
> >> TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
> >>
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
> >> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamThread.run(
> >> StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
> >>
> >>
> >> However, I have checked and the folder created starts with: *1_*
> >>
> >> ls -lha /tmp/kafka-stream/XXX/1_1
> >> total 8
> >> drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
> >> drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
> >> -rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
> >> -rw-r--r--   1 a  b 0B  1 Mar 16:05 .lock
> >> drwxr-xr-x   3 a  b96B  1 Mar 16:43
> >>
> >>
> >>
> >> Cheers!
> >> --
> >> Jonathan
> >>
> >>
> >>
> >> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <
> >> jonathansanti...@gmail.com>
> >> wrote:
> >>
> >> > Hello John, hope you are well.
> >> > I have tested the version 2.2 release candidate (although I know it
> has
> >> > been postponed).
> >> > I have been following this email thread because I think am
> experiencing
> >> > the same issue. I have reported in an email to this list and also all
> >> the
> >> > details are in OS (
> >> >
> >>
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> >> > ).
> >> >
> >> > After the test, the result is the same as before (at least for my
> case),
> >> > already processed records are passed again to the output topic causing
> >> the
> >> > data duplication:
> >> >
> >> > ...
> >> > 2019-03-01 16:55:23,808 INFO
> >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> > internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> >> > stream-thread
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> No
> >> > checkpoint found for task 1_10 state store
> >> > XXX-KTABLE-SUPPRESS-STATE-STORE-11-changelog-10 with EOS
> turned
> >> on. *Reinitializing
> >> > the task and restore its state from the beginning.*
> >> >
> >> > ...
> >> >
> >> >
> >> >