Hi John,

On 1/7/19 9:10 PM, John Roesler wrote:
Hi Peter,

Sorry, I just now have seen this thread.

You asked if this behavior is unexpected, and the answer is yes.
Suppress.untilWindowCloses is intended to emit only the final result,
regardless of restarts.

You also asked how the suppression buffer can resume after a restart, since
it's not persistent.
The answer is the same as for in-memory stores. The state of the store (or
buffer, in this case)
is persisted to a changelog topic, which is re-read on restart to re-create
the exact state prior to shutdown.
"Persistent" in the store nomenclature refers only to "persistent on the
local disk".

Just to confirm your response regarding the buffer size:
While it is better to use the public ("Suppressed.unbounded()") API, yes,
your buffer was already unbounded.

I looked at your custom transfomer, and it looks almost correct to me. The
only flaw seems to be that it only looks
for closed windows for the key currently being processed, which means that
if you have key "A" buffered, but don't get another event for it for a
while after the window closes, you won't emit the final result. This might
actually take longer than the window retention period, in which case, the
data would be deleted without ever emitting the final result.

So in DSL case, the suppression works by flushing *all* of the "ripe" windows in the whole buffer whenever a singe event comes in with recent enough timestamp regardless of the key of that event?

Is the buffer shared among processing tasks or does each task maintain its own private buffer that only contains its share of data pertaining to assigned input partitions? In case the tasks are executed on several processing JVM(s) the buffer can't really be shared, right? In that case a single event can't flush all of the "ripe" windows, but just those that are contained in the task's part of buffer...


You said you think it should be possible to get the DSL version working,
and I agree, since this is exactly what it was designed for. Do you mind
filing a bug in the "KAFKA" Jira project (
https://issues.apache.org/jira/secure/Dashboard.jspa)? It will be easier to
keep the investigation organized that way.

Will do that.


In the mean time, I'll take another look at your logs above and try to
reason about what could be wrong.

Just one clarification... For example, you showed
[pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
272, 548, 172], sum: 138902
[pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164

Am I correct in thinking that the first, shorter list is the "incremental"
version, and the second is the "final" version? I think so, but am confused
by "INSTEAD OF".

It's the other way around. The 1st list (usually the longer one) is what has just been consumed and the second is what had been consumed before that for the same key (I maintain a ConcurrentHashMap of consumed entries in the test and execute: secondList = map.put(key, firstList) ....

In majority of cases, the consumed list is an incremental update of some previous version of the list (not necessarily direct descendant) consumed before that, but as said, I also observed the final window result before processor restart and after restart some previous version of non-final window aggregation for the same key.

May I also note that there is some "jitter" in the input timestamps because I'm trying to model a real usecase where there will be several input(s) to the system with only approximately synchronized clocks. The jitter is kept well below the TimeWindow grace period so there should be no events consumed by the processor that belong to windows that have already been flushed.

Regards, Peter


Thanks for the report,
-John



On Wed, Dec 26, 2018 at 3:21 AM Peter Levart <peter.lev...@gmail.com> wrote:


On 12/21/18 3:16 PM, Peter Levart wrote:
I also see some results that are actual non-final window aggregations
that precede the final aggregations. These non-final results are never
emitted out of order (for example, no such non-final result would ever
come after the final result for a particular key/window).
Absence of proof is not the proof of absence... And I have later
observed (using the DSL variant, not the custom Transformer) an
occurrence of a non-final result that was emited after restart of
streams processor while the final result for the same key/window had
been emitted before the restart:

[pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550,
81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856
...
... restart ...
...
[pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550]
INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648


The app logic can not even rely on guarantee that results are ordered
then. This is really not usable until the bug is fixed.

Regards, Peter



Reply via email to