I see. This is an interesting use case, and seems quite common. I think the issue is that we are trying to use trigger in a global window of an unbounded stream. This somehow leads Beam to use AfterSynchronizedProcessingTime trigger. I am wondering is it OK for you to change the global window to a window large enough, like a few days, to simulate a global window? Or have you considered making it a 5-second fixed window so updates will be emitted within 5 seconds and we don't need to use the global window with trigger?
Thanks, Xinyu On Fri, Feb 12, 2021 at 7:14 AM Jan Bensien <stu128...@mail.uni-kiel.de> wrote: > Hey, > > Thank you for your help. With your help I was able to identify the > problem within my code. The problem is that my trigger followed by an > Latest.perKey transformation results in a > AfterSynchronizedProcessingTime trigger. > The minimal example of this problem is the following: > https://gist.github.com/janb15/8c536dbdbc1be40e2698122257f74760 > I did try this within the wordcount project using version 2.27.0. This > did not fix the problem. > The use case that i want to implement uses a changelog stream to enrich > my input with further data using side inputs. The stream, which is used > to generate the side input is only updated when a user performs manual > re-configuration. Because of this I use a trigger on a global window to > periodically generate a view. Is there a way to circumvent the creation > of a synchronized processing time trigger? > > With many thanks, > > Jan > > Am 2021-02-10 00:57, schrieb Xinyu Liu: > > > Jan, > > > > I tried the latest Beam 2.27 version and ran into the same issue as you > > saw. I dug a bit deeper and it was caused by the recent changes in beam > > to > > enable SplitableParDo in all runners. While we are going to work with > > Beam > > to get this resolved, you can avoid the issue by adding this argument > > "--experiments=use_deprecated_read" when running your program. This > > flag > > will disable the new code path to make it work as before. > > > > I also tried your triggering code in the KafkaWordCount example in > > samza-beam-examples git repo > > (https://github.com/apache/samza-beam-examples). > > Seems it is working for me as I can see the 1 second early firing > > within a > > 10 sec window and the fired panes are accumulated. You can also use > > this > > git repo as a reference. > > > > Thanks, > > Xinyu > > > > On Mon, Feb 8, 2021 at 9:41 AM Xinyu Liu <xinyuliu...@gmail.com> wrote: > > > > Hi, Jan, > > > > Thanks for reporting this issue to us. Processing time triggers are > > supported in Samza Runner with version Beam 2.22.0 [1]. The > > exception message wasn't updated after we added the support of > > processing > > time. Apologize for the confusion here. Looks most of the exception > > messages have been fixed in the latest code. > > > > From reading the code, it seems we will only run into this exception if > > we > > somehow end up having TimeDomain as synchronized_processing_time [2]. > > Samza > > runner does not support this time domain. Are you aware that your code > > might use it somehow? If not, I can help debug further. We have other > > users > > who use processing time triggers for early triggering, and it was > > working > > fine. > > > > I will also take a look at 2.27.0. LinkedIn has been recently upgraded > > to > > 2.26.0, and we found a few issues. Previously we were using a version > > close > > to 2.24.0. > > > > Thanks, > > Xinyu > > > > [1]: > > > https://github.com/apache/beam/blob/9b43fadb8bb6f4bcabc945fc299b378eb1d7d205/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java#L347 > > [2] > > > https://github.com/apache/beam/blob/055140203ce2df56ba903b05266466cf16562dde/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimeDomain.java#L49 > > > > On Sun, Feb 7, 2021 at 2:24 PM Jan Bensien <stu128...@mail.uni-kiel.de> > > wrote: > > > > Hello, > > > > I am currently trying to execute my Beam Pipelines using the Samza > > Runner. I am using processing time triggers for calculating early > > results for my larger windows. > > However i am getting the following error: > > java.lang.UnsupportedOperationException: class > > org.apache.beam.runners.samza.SamzaRunner currently only supports event > > time. > > Looking at the capability matrix of > > Beam(https://beam.apache.org/documentation/runners/capability-matrix/), > > it looks like processing time should be supported. > > I could not find a documentation, for the exact supported features for > > the different runner versions. > > I am using the version 2.22.0 for the Samza Runner but also tried > > 2.25.0 > > and got the same error. When i tried to upgrade to 2.27.0 I got the > > following error: java.lang.UnsupportedOperationException: > > BundleFinalizer unsupported in Samza. This happens whenever i use > > KafkaIO to read from Kafka. Even when i tried a Pipeline that did > > nothing except reading from Kafka. > > > > The trigger that caused the exception is the following: . > > triggering(Repeatedly.forever( > > AfterProcessingTime.pastFirstElementInPane() > > .plusDelayOf(Duration.standardSeconds(1)))) > > .accumulatingFiredPanes()); > > > > Running the pipeline with the Direct Runner worked fine. Which version > > is the latest stable version of the Samza Runner and does it support > > processing time triggers? > > > > With many thanks, > > > > Jan