Hi John,

On 1/24/19 3:18 PM, John Roesler wrote:
Hi Peter,

Thanks for the clarification.

When you hit the "stop" button, AFAIK it does send a SIGTERM, but I don't
think that Streams automatically registers a shutdown hook. In our examples
and demos, we register a shutdown hook "outside" of streams (right next to
the code that calls start() ).
Unless I missed something, a SIGTERM would still cause Streams to exit
abruptly, skipping flush and commit. This can cause apparent duplicates *if
you're not using EOS or if you're reading uncommitted transactions*.

The fact is that Spring which I use to instantiate the KafkaStreams object does that:

    @Bean(initMethod = "start", destroyMethod = "close")
    public KafkaStreams processorStreams(...

..so when JVM gets SIGTERM, the shutdown hook that Spring installs shuts down the ApplicationContext which calls all "destroyMethod"(s) on registered Bean(s)...

And the duplicates are less apparent but still occur even in EOS mode... But they are not actual duplicates. They are duplicate(s) only by windowed keys, the values are different...


The reason is that, upon restart, the suppression buffer can only
"remember" what got sent & committed to its changelog topic before.

The scenario I have in mind is:

...
* buffer state X
...
* flush state X to buffer changelog
...
* commit transaction T0; start new transaction T1
...
* emit final result X (in uncommitted transaction T1)
...
* crash before flushing to the changelog the fact that state X was emitted.
Also, transaction T1 gets aborted, since we crash before committing.
...
* restart, restoring state X again from the changelog (because the emit
didn't get committed)
* start transaction T2
* emit final result X again (in uncommitted transaction T2)
...
* commit transaction T2
...

So, the result gets emitted twice, but the first time is in an aborted
transaction. This leads me to another clarifying question:

Based on your first message, it seems like the duplicates you observe are
in the output topic. When you read the topic, do you configure your
consumer with "read committed" mode? If not, you'll see "results" from
uncommitted transactions, which could explain the duplicates.

So when EOS is enabled, the output topics are used in transactional manner. The consumer of such topic should enable read_commited semantics then...

That would do if my problem was about seeing duplicates of final windowing results. That is not my problem. My problem is that upon restart of processor, I see some non-final window aggregations, followed by final aggregations for the same windowed key. That's harder to tolerate in an application. If it was just duplicates of the "correct" aggregation I could ignore the 2nd and subsequent message for the same windowed key, but if I 1st get a non-final aggregation, I can not simply ignore the 2nd occurence of the same windowed key. I must cope with "replacing the previous aggregation with new version of it" in the app. Meaning, that suppression of non-final results does not buy me anything as it is not guaranteeing that.

Is it possible that non-final windowed aggregations are emitted in some scenario, but then such transaction is rolled-back and I would not see the non-fnal aggregations if I enabled read commited isolation on consumer?

I think I'll have to reinstate the demo and try that...

Stay tuned.

Regards, Peter


Likewise, if you were to attach a callback, like "foreach" downstream of
the suppression, you would see duplicates in the case of a crash. Callbacks
are a general "hole" in EOS, which I have some ideas to close, but that's a
separate topic.

There may still be something else going on, but I'm trying to start with
the simpler explanations.

Thanks again,
-John

Thanks,
-John

On Wed, Jan 23, 2019 at 5:11 AM Peter Levart <peter.lev...@gmail.com> wrote:

Hi John,

Sorry I haven't had time to prepare the minimal reproducer yet. I still
have plans to do it though...

On 1/22/19 8:02 PM, John Roesler wrote:
Hi Peter,

Just to follow up on the actual bug, can you confirm whether:
* when you say "restart", do you mean orderly shutdown and restart, or
crash and restart?
I start it as SpringBoot application from IDEA and then stop it with the
red square button. It does initiate the shutdown sequence before
exiting... So I think it is by SIGTERM which initiates JVM shutdown
hook(s).

* have you tried this with EOS enabled? I can imagine some ways that
there
could be duplicates, but they should be impossible with EOS enabled.
Yes, I have EOS enabled.

Thanks for your help,
-John
Regards, Peter

On Mon, Jan 14, 2019 at 1:20 PM John Roesler <j...@confluent.io> wrote:

Hi Peter,

I see your train of thought, but the actual implementation of the
window store is structured differently from your mental model.
Unlike Key/Value stores, we know that the records in a window
store will "expire" on a regular schedule, and also that every single
record will eventually expire. With this in mind, we have implemented
an optimization to avoid a lot of compaction overhead in RocksDB, as
well as saving on range scans.

Instead of storing everything in one database, we open several
databases and bucket windows into them. Then, when windows
expire, we just ignore the records (i.e., the API makes them
unreachable,
but we don't actually delete them). Once all the windows in a database
are expired, we just close and delete the whole database. Then, we open
a new one for new windows. If you look in the code, these databases are
called "segments".

Thus, I don't think that you should attempt to use the built-in window
stores
as you described. Instead, it should be straightforward to implement
your
own StateStore with a layout that's more favorable to your desired
behavior.

You should also be able to set up the change log the way you need as
well.
Explicitly removed entities also would get removed from the log as
well, if
it's a compacted log.

Actually, what you're describing is *very* similar to the implementation
for suppress. I might actually suggest that you just copy the
suppression
implementation and adapt it to your needs, or at the very least, study
how it works. In doing so, you might actually discover the cause of the
bug yourself!

I hope this helps, and thanks for your help,
-John


On Sat, Jan 12, 2019 at 5:45 AM Peter Levart <peter.lev...@gmail.com>
wrote:

Hi Jonh,

Thank you very much for explaining how WindowStore works. I have some
more questions...

On 1/10/19 5:33 PM, John Roesler wrote:
Hi Peter,

Regarding retention, I was not referring to log retention, but to the
window store retention.
Since a new window is created every second (for example), there are in
principle an unbounded
number of windows (the longer the application runs, the more windows
there
are, with no end).
However, we obviously can't store an infinite amount of data, so the
window
definition includes
a retention period. By default, this is 24 hours. After the retention
period elapses, all of the data
for the window is purged to make room for new windows.
Right. Would the following work for example:

- configure retention of WindowStore to be "infinite"
- explicitly remove records from the store when windows are flushed out
- configure WindowStore log topic for compacting

Something like the following:

           Stores
               .windowStoreBuilder(
                   Stores.persistentWindowStore(
                       storeName,
                       Duration.of(1000L, ChronoUnit.YEARS), //
retentionPeriod
                       Duration.ofSeconds(10), // windowSize
                       false
                   ),
                   keySerde, valSerde
               )
               .withCachingEnabled()
               .withLoggingEnabled(
                   Map.of(
                       TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT
                   )
               );

Would in above scenario:

- the on-disk WindowStore be kept bounded (there could be some very old
entries in it but majority will be new - depending on the activity of
particular input keys)
- the log topic be kept bounded (explicitly removed entries would be
removed from compacted log too)

I'm moving away from DSL partly because I have some problems with
suppression (which I hope we'll be able to fix) and partly because the
DSL can't give me the complicated semantics that I need for the
application at hand. I tried to capture what I need in a custom
Transformer here:

https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f

Your knowledge of how WindowStore works would greatly help me decide if
this is a workable idea.

So what I meant was that if you buffer some key "A" in window (Monday
09:00:00) and then get
no further activity for A for over 24 hours, then when you do get that
next
event for A, say at
(Tuesday 11:00:00), you'd do the scan but find nothing, since your
buffered
state would already
have been purged from the store.
Right. That would be the case when WindowStore was configured with
default retention of 24 hours. A quick question: What does window size
configuration for WindowStore (see above) do? Does it have to be
synchronized with the size of windows stored in it?

The way I avoided this problem for Suppression was to organize the
data
by
timestamp instead
of by key, so on *every* update I can search for all the keys that are
old
enough and emit them.
I also don't use a window store, so I don't have to worry about the
retention time.

To answer your question about the window store's topic, it configures
a
retention time the same
length as the store's retention time, (and they keys are the full
windowed
key including the window
start time), so it'll have roughly the same size bound as the store
itself.

Would explicitly removed entries from WindowStore be removed from log
too if it was a compacting log?

Back to the process of figuring out what might be wrong with
Suppression, I
don't suppose you
would be able to file a Jira and upload a repro program? If not,
that's
ok.
I haven't been able to
reproduce the bug yet, but it seems like it's happening somewhat
consistently for you, so I should
be able to get it to happen eventually.

Thanks, and sorry again for the troubles.
-John
I can prepare a minimal reproducer. No problem...

Regards, Peter

On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <peter.lev...@gmail.com>
wrote:
On 1/8/19 12:57 PM, Peter Levart wrote:
Hi John,

On 1/8/19 12:45 PM, Peter Levart wrote:
I looked at your custom transfomer, and it looks almost correct to
me. The
only flaw seems to be that it only looks
for closed windows for the key currently being processed, which
means that
if you have key "A" buffered, but don't get another event for it
for a
while after the window closes, you won't emit the final result.
This
might
actually take longer than the window retention period, in which
case, the
data would be deleted without ever emitting the final result.
So in DSL case, the suppression works by flushing *all* of the
"ripe"
windows in the whole buffer whenever a singe event comes in with
recent enough timestamp regardless of the key of that event?

Is the buffer shared among processing tasks or does each task
maintain its own private buffer that only contains its share of
data
pertaining to assigned input partitions? In case the tasks are
executed on several processing JVM(s) the buffer can't really be
shared, right? In that case a single event can't flush all of the
"ripe" windows, but just those that are contained in the task's
part
of buffer...
Just a question about your comment above:

/"This might actually take longer than the window retention period,
in
which case, the data would be deleted without ever emitting the
final
result"/

Are you talking about the buffer log topic retention? Aren't log
topics configured to "compact" rather than "delete" messages? So the
last "version" of the buffer entry for a particular key should stay
forever? What are the keys in suppression buffer log topic? Are
they a
pair of (timestamp, key) ? Probably not since in that case the
compacted log would grow indefinitely...

Another question:

What are the keys in WindowStore's log topic? If the input keys to
the
processor that uses such WindowStore consist of a bounded set of
values (for example user ids), would compacted log of such
WindowStore
also be bounded?
In case the key of WindowStore log topic is (timestamp, key) then
would
explicitly deleting flushed entries from WindowStore (by putting null
value into the store) keep the compacted log bounded? In other words,
does WindowStore log topic support a special kind of "tombstone"
message
that effectively removes the key from the compacted log?

In that case, my custom processor could keep entries in its
WindowStore
for as log as needed, depending on the activity of a particular input
key...

Regards, Peter




Reply via email to