Hmm. I've seen this manifest in some other tweaked versions of Sessions.
Your invariants are right. In fact, the Nexmark queries have auctions that
truncate in a similar way. This prompted
https://issues.apache.org/jira/browse/BEAM-654.  I think we have not really
nailed down the right spec for merging, and we certainly aren't enforcing
it. To be robust, your merging should be associative and commutative, which
means that you can't have an "end of session" event that contradicts a
merge that occurred. OTOH I also know that Tyler has hacked window
functions that split... it is mostly unexplored, semantically.

About the error, this may help debug: The "state address windows" for a
given merged window are all the windows that contribute to it. This means
that when windows A and B merge to become a window AB, we can leave the
accumulated state stored with A and B and just note that when we read from
AB we actually have to read from both A and B*. So suppose windows A and B
are about to merge. Before merge, the state address window map is:

A -> [A]
B -> [B]

After merge, there a new window AB and "window to state address window"
mapping

AB -> [A, B]

The error means that there is more than one merged window that will read
data from a pre-merged window. So there is a situation like

AB -> [A, B]
BC -> [B, C]

This is not intended to happen. It would be the consequence of B merging
into two different new windows. Hence it is an internal error. Most likely
a bug or a mismatch based on the assumptions. Note that this code/logic is
shared by all runners. I do think you can write a WindowFn that induces it.

Kenn

*this was intended to be a performance optimization, but eagerly copying
the data turned out faster so now it is a legacy compatibility thing that
we could remove it I think, but changing this code is tricky

On Tue, Jan 7, 2020 at 3:27 PM Aaron Dixon <atdi...@gmail.com> wrote:

> What I'm attempting is a variation on Session windows in which there may
> exist a "terminal" element in the stream that immediately stops the session
> (or perhaps after some configured delay.)
>
> My implementation behaves just like Sessions until any such "terminal"
> element is encountered in which case I mark the window as "terminal" and
> all windows "merge down" such that any terminal windows get to dictate the
> Interval.end()/Window.maxTimestamp().
>
> So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75,
> terminal = true] then the merged result will be W3 [0, 75).
>
> I've been successful doing this so far but I've been inferring some
> invariants about windows that I'm not sure are official or documented
> anywhere.
>
> The invariants that I've inferred go like this:
>
> (I) Definition. An element is "in" window W if it originated in W or in a
> window that was merged into W (, recursively.)
>
> (II) Invariant. Any element, e, in window W MUST have e.timestamp <=
> W.maxTimestamp().
>
> So far, I think this is obvious and true stuff (I hope). (It would
> actually be better or great if there was a way for II to not have to hold,
> but that is a whole other separate discussion I think.)
>
> The main invariant I'm trying to formalize is one that allows me to "merge
> down" -- i.e., to merge in such a way that the merged window's
> (mergedResult's) maxTimestamp *is less than* one of the source's
> (toBeMerged's) windows' maxTimestamp.
>
> The (undocumented?) invariant I've been working from goes something like
> this:
>
> (III) Corollary. Windows W1 and W2 can merge such that either
> maxTimestamp() is regressed (moved backward in time aka "merge down") in
> the merged window -- however they cannot merge such that (II) is ever
> violated.
>
> Is this correct?
>
> (If you can this can be confirmed, I'll go back and ensure I'm not
> violating the merge() precondition and these invariants and post some code
> if needed..) Thank you for assistance heere!
>
>
> On Tue, Jan 7, 2020 at 4:21 PM Reuven Lax <re...@google.com> wrote:
>
>> Have you used Dataflow's update feature on this pipeline? Also, do
>> you have the code for your WindowFn?
>>
>> On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon <atdi...@gmail.com> wrote:
>>
>>> Dataflow. (See stacktrace)
>>>
>>> On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Which runner are you using?
>>>>
>>>> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <atdi...@gmail.com> wrote:
>>>>
>>>>> I get an IllegalStateException "<window> is in more than one state
>>>>> address window set" (stacktrace below).
>>>>>
>>>>> What does this mean? What invariant of custom window implementation
>>>>> & merging am I violating?
>>>>>
>>>>> Thank you for any advise.
>>>>>
>>>>> ```
>>>>> java.lang.IllegalStateException:
>>>>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more
>>>>> than one state address window set
>>>>> at
>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>>> (Preconditions.java:588)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
>>>>> (MergingActiveWindowSet.java:334)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
>>>>> (MergingActiveWindowSet.java:88)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
>>>>> (ReduceFnRunner.java:380)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
>>>>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>>>>> ...
>>>>> ```
>>>>>
>>>>

Reply via email to