Hello, We did end up opening a GCP ticket. This is the advice they gave us:
"I understand that you are getting an error message [1] when working with a custom windowing function. You are trying the options as “bounded session” and “ per key fixed window” as mentioned from the third party documentation. I did further research regarding the error and it seems like an internal error, failing a sanity check. As windows merge, they create a new window, but their state need not be copied. Instead, the window is mapped to a list of windows to find the state in. Suppose windows A and B are about to merge. Before merge, the state address window map is: A -> [A] B -> [B] After merge, there a new window AB and A -> [A] B -> [B] AB -> [A, B] The error means that there is more than one merged window that will read data from a pre-merged window. So there is a situation like AB -> [A, B] BC -> [B, C] This is not intended to happen." Does this sound reasonable as what is happening here? We havent got around to debugging it yet but maybe soon we will take a closer look Thanks, Sahith On Wed, Mar 22, 2023 at 2:15 PM Svetak Sundhar <svetaksund...@google.com> wrote: > Hi Sahith, > > It isn't immediately obvious to me what the error might be, though I was > able to sift through the stacktrace and find areas of the codebase that it > touches ( > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java#L412 > ). > > Perhaps we can schedule a call to look into the code and learn more about > what might be going wrong? Alternatively, you could file a GCP Support > ticket, that'll give us access to look into the Dataflow job to see if we > can find any more evidence of what might be going wrong. > > Thanks, > > > Svetak Sundhar > > Technical Solutions Engineer, Data > s <nellywil...@google.com>vetaksund...@google.com > > > > On Fri, Mar 17, 2023 at 1:02 PM Sahith Nallapareddy via dev < > dev@beam.apache.org> wrote: > >> Hello, >> >> We are working on writing a custom windowing function. The functionality >> is similar to the one described in this book >> https://www.oreilly.com/library/view/streaming-systems/9781491983867/ch04.html >> (the bounded session and per key fixed window is what we are trying). >> However, we are not sure what is wrong with our implementation as we run >> into this error in dataflow: Error message from worker: >> java.lang.IllegalStateException: >> [2023-03-14T21:28:46.639Z..2023-03-14T21:58:46.639Z) is in more than one >> state address window set >> >> Can anyone explain what this error means and how we can reproduce it? we >> have tests setup and the tests pass fine, this only appears in dataflow >> >> >> Full stack trace: >> >> java.lang.IllegalStateException: >> [2023-03-14T21:28:54.817Z..2023-03-14T21:43:54.817Z) is in more than one >> state address window set at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:588) >> at >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants(MergingActiveWindowSet.java:335) >> at >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist(MergingActiveWindowSet.java:89) >> at >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist(ReduceFnRunner.java:385) >> at >> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:98) >> at >> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43) >> at >> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121) >> at >> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) >> at >> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) >> at >> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137) >> at >> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) >> at >> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) >> at >> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212) >> at >> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163) >> at >> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) >> at >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1445) >> at >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165) >> at >> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1120) >> at >> org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133) >> at >> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >> at >> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >> at java.base/java.lang.Thread.run(Thread.java:834) >> >> Thanks, >> >> Sahith >> >