Hello,

We did end up opening a GCP ticket. This is the advice they gave us:

"I understand that you are getting an error message [1] when working with a
custom windowing function. You are trying the options as

“bounded session”  and “ per key fixed window” as mentioned from the third
party documentation.


I did further research regarding the error and it seems like an  internal
error, failing a sanity check.


As windows merge, they create a new window, but their state need not be
copied. Instead, the window is mapped to a list of windows to find the
state in.


Suppose windows A and B are about to merge. Before merge, the state address
window map is:


A -> [A]

B -> [B]


After merge, there a new window AB and


A -> [A]

B -> [B]

AB -> [A, B]


The error means that there is more than one merged window that will read
data from a pre-merged window. So there is a situation like


AB -> [A, B]

BC -> [B, C]


This is not intended to happen."

Does this sound reasonable as what is happening here? We havent got around
to debugging it yet but maybe soon we will take a closer look

Thanks,

Sahith




On Wed, Mar 22, 2023 at 2:15 PM Svetak Sundhar <svetaksund...@google.com>
wrote:

> Hi Sahith,
>
> It isn't immediately obvious to me what the error might be, though I was
> able to sift through the stacktrace and find areas of the codebase that it
> touches (
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java#L412
> ).
>
> Perhaps we can schedule a call to look into the code and learn more about
> what might be going wrong? Alternatively, you could file a GCP Support
> ticket, that'll give us access to look into the Dataflow job to see if we
> can find any more evidence of what might be going wrong.
>
> Thanks,
>
>
> Svetak Sundhar
>
>   Technical Solutions Engineer, Data
> s <nellywil...@google.com>vetaksund...@google.com
>
>
>
> On Fri, Mar 17, 2023 at 1:02 PM Sahith Nallapareddy via dev <
> dev@beam.apache.org> wrote:
>
>> Hello,
>>
>> We are working on writing a custom windowing function. The functionality
>> is similar to the one described in this book
>> https://www.oreilly.com/library/view/streaming-systems/9781491983867/ch04.html
>> (the bounded session and per key fixed window is what we are trying).
>> However, we are not sure what is wrong with our implementation as we run
>> into this error in dataflow: Error message from worker:
>> java.lang.IllegalStateException:
>> [2023-03-14T21:28:46.639Z..2023-03-14T21:58:46.639Z) is in more than one
>> state address window set
>>
>> Can anyone explain what this error means and how we can reproduce it? we
>> have tests setup and the tests pass fine, this only appears in dataflow
>>
>>
>> Full stack trace:
>>
>> java.lang.IllegalStateException:
>> [2023-03-14T21:28:54.817Z..2023-03-14T21:43:54.817Z) is in more than one
>> state address window set at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:588)
>> at
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants(MergingActiveWindowSet.java:335)
>> at
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist(MergingActiveWindowSet.java:89)
>> at
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist(ReduceFnRunner.java:385)
>> at
>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:98)
>> at
>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
>> at
>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
>> at
>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>> at
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>> at
>> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
>> at
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>> at
>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>> at
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
>> at
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
>> at
>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
>> at
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1445)
>> at
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
>> at
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1120)
>> at
>> org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> at java.base/java.lang.Thread.run(Thread.java:834)
>>
>> Thanks,
>>
>> Sahith
>>
>

Reply via email to