Re: Beam/Dataflow pipeline backfill via JDBC
The pipeline works fine with the Direct Runner. The issue appears to be specific to streaming mode on Dataflow. I've updated my pipeline to use Pub/Sub as an input instead, and digging into the Dataflow console, it looks like execution of a particular GroupByKey is moving extremely slowly -- the watermark for the prior step is caught up to real time, but the GroupByKey step data watermark is currently June 4, 2020 and takes about 10 minutes to advance a single day, which is ridiculously slow. In batch mode, the whole backlog is processed in about 30 minutes. It is actually a reasonably large and complex pipeline, so I'm not actually sure where I would even start with code snippets. The particular GroupByKey that seems to be running extremely slowly is "FixedTickGroupByTeam" and is in a part of the pipeline that looks like this -- it really should be processing only a few events per day with an event time of Jun 4th 2020, so something is definitely not right: val scores = timestampedCheckins .apply( "FixedTickWindows", Window.into(FixedWindows.of(5.minutes.asJoda())) // NOTE that we use the default timestamp combiner (end of window) to avoid // https://issues.apache.org/jira/browse/BEAM-2262 .triggering( AfterWatermark.pastEndOfWindow() .withLateFirings(AfterPane.elementCountAtLeast(1)) ) .withAllowedLateness(3.days.asJoda(), Window.ClosingBehavior.FIRE_IF_NON_EMPTY) .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS) .withTimestampCombiner(TimestampCombiner.LATEST) .discardingFiredPanes() ) .apply("FixedTickKeyByTeam", WithKeys.of { it.teamId }) .apply("FixedTickGroupByTeam", GroupByKey.create()) .apply("GlobalWindowsLoopingStatefulTimer", Window.into(GlobalWindows()) ) .apply("LoopingStatefulTimer", ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays ?: 30).days)) ) // this window and subsequent group by and flatten combines the empty iterable timer output with any actual check-ins .apply("FixedTickWindowsPostTimer", Window.into>>(FixedWindows.of(5.minutes.asJoda())) .applyWindowingOptions() ) .apply("FixedTickGroupByTeamPostTimer", GroupByKey.create()) .apply("FixedTickFlattenPostTimer", flattenValues()) // convert to an explicit PeriodContent type representing either an empty period or a period with check-ins // this allows us to carry forward the timestamp of an empty period, without it being flattened into a single empty //.apply("MapToPeriodContent", ParDo.of(MapToPeriodContentFn())) .apply("CheckinTimingScoreFn", ParDo.of(CheckinTimingScoreFn(scoreCalculationServiceBuilder, checkinStateView)).withSideInputs(checkinStateView) ) On Wed, May 12, 2021 at 12:43 PM Kenneth Knowles wrote: > Can you share some more details, such as code? We may identify something > that relies upon assumptions from batch execution style. > > Also notably the Java DirectRunner does not have separate batch/streaming > mode. It always executes in a "streaming" sort of way. It is also simpler > in some ways so if you can reproduce it on the DirectRunner that might help. > > Kenn > > On Tue, May 11, 2021 at 3:41 PM Raman Gupta wrote: > >> I have a Dataflow pipeline that reads data from JDBC and Pub/Sub. My >> ideal pipeline backfills its state and output from historical data via the >> JDBC input, and then continues processing new elements arriving via >> pub/sub. Conceptually, this seems easy to do with a filter on each source >> before/after some specific cutoff instant. >> >> However, when I add pub/sub into the pipeline, it runs in streaming mode, >> and the pipeline does not produce the expected results -- all of the >> results that would be produced based on looping timers seem to be missing. >> >> I thought this might be related to the post-inputs Flatten, but I've >> taken pub/sub out of the equation, and run the same exact JDBC-based >> pipeline in batch vs streaming mode, and the JDBC-only pipeline in >> streaming mode produces the same partial results. >> >> What could be happening? >> >> Regards, >> Raman >> >>
Beam/Dataflow pipeline backfill via JDBC
I have a Dataflow pipeline that reads data from JDBC and Pub/Sub. My ideal pipeline backfills its state and output from historical data via the JDBC input, and then continues processing new elements arriving via pub/sub. Conceptually, this seems easy to do with a filter on each source before/after some specific cutoff instant. However, when I add pub/sub into the pipeline, it runs in streaming mode, and the pipeline does not produce the expected results -- all of the results that would be produced based on looping timers seem to be missing. I thought this might be related to the post-inputs Flatten, but I've taken pub/sub out of the equation, and run the same exact JDBC-based pipeline in batch vs streaming mode, and the JDBC-only pipeline in streaming mode produces the same partial results. What could be happening? Regards, Raman
Running averages -- but global window state might not be enough
I have a running average problem. As I understand it, the traditional Beam solution is state in a global window, but I'm not quite sure how to approach it for my use case, which is a bit more complex. I have a "score" output every 5 minutes based on a timer, up to a maximum of 1 hour after some time, depending on the arrival times of a few input events per day. The output of this initial part of the pipeline is 1) versioned, so when running the pipeline in batch mode, or dealing with up to 3-day late inputs, the score in the output system is continuously updated (and outputs from out-of-order inputs are ignored) and 2) aggregated into a daily score along with inputs coming from other pipeline branches, which is also continuously updated part-way through the day with early and late triggers. Now, I need to calculate the running average of the individual scores output every 5 minutes multiple times per day, and factor those into the overall aggregated daily score. The running average should consider only the highest version score for each day on which there are scores. I don't see how I can do this with global windows without keeping a full history of the latest score and version on every previous day, which will grow without bound. Or am I missing something? Thanks, Raman
Re: Side windows and late inputs
I'm also having the opposite problem -- my unit test moves the watermark to just before a timestamped input. I was expecting the DoFn to execute for all windows up to that watermark, seeing nothing in the side input. However, that isn't the case -- the test shows that the DoFn side input *before* the watermark, contains the input timestamped *after* the watermark. This makes no sense to me. Regards, Raman On Sat, Apr 10, 2021 at 2:46 AM Raman Gupta wrote: > I have a topology that looks like this: > > input -> 1hr window -> view > | > -> 5m window -> dofn(view) > > IOW -- an input is windowed into both 1 hour windows, as well as 5 minute > windows. The 1 hour window data is collected into a view. > > Meanwhile, the input is also windowed with 5 minute windows, there is some > processing, and then finally a DoFn that uses the view as a side input. > > Both windows allow late data, and fire immediately on a late element. My > expectation would have been that any processing occurring in the DoFn after > the processing time of the late element would see the late element in the > view. However, that doesn't seem to be the case. Why? > > Is there another approach I can use that doesn't suffer from this problem? > > Regards, > Raman Gupta > >
Side windows and late inputs
I have a topology that looks like this: input -> 1hr window -> view | -> 5m window -> dofn(view) IOW -- an input is windowed into both 1 hour windows, as well as 5 minute windows. The 1 hour window data is collected into a view. Meanwhile, the input is also windowed with 5 minute windows, there is some processing, and then finally a DoFn that uses the view as a side input. Both windows allow late data, and fire immediately on a late element. My expectation would have been that any processing occurring in the DoFn after the processing time of the late element would see the late element in the view. However, that doesn't seem to be the case. Why? Is there another approach I can use that doesn't suffer from this problem? Regards, Raman Gupta
Re: Triggering partway through a window
On Mon, Mar 29, 2021 at 1:17 PM Kenneth Knowles wrote: > That's a neat example! > > The trigger you have there will emit a ton of output. What is your > accumulation mode? I assume it must be accumulatingFiredPanes() otherwise > you would not actually have access to the prior 6 days of input. > Yup, accumulating. > > The only trigger that is based on completeness of data is the > AfterWatermark.pastEndOfWindow() trigger, so you have to use that to > capture the 6 days of data: > > prior6days = input.apply(Window.into(<6 day windows sliding one > day>).triggering(AfterWatermark.pastEndOfWindow()) > > Now if you GBK this collection, each group will have a timestamp that is > the end of the 6 day period. You can use ParDo with outputWithTimestamp to > move the timestamp up to any timestamp in the following day, yielding a > PCollection of 6 day grouping of data with a timestamp in the last day of > the 7. If the 6 days of data is large you may hit size limits (either hard > limits or perf problems) and have to do something fancier. > > Flatten this with the input PCollection and window into FixedWindows( day>) and trigger however you like, again with accumulatingFiredPanes(). > There is no guarantee that the 6 days of past data arrives prior to > elements in the last day. In fact since it will be delayed by an extra > shuffle you would expect it to often show up later. So this is a heuristic > approach equivalent to what it sounds like you are already doing that > should lower the cost. > Ah interesting. Yes, this would likely have worked for me. > > If you want a guarantee that the 6 day buffer arrives prior to the other > elements you will need to do something else. You could write a WindowFn > that assigned all 7 days of data to a window that only spanned the first 6 > days, then trigger at end of window plus allowing late data (no early > firings). Then every firing would be guaranteed by the watermark to have > the first 6 days of data plus whatever else has shown up. (I assume part of > your spec is that you do want data to be processed as it arrives, versus > waiting until the end of the 7 day window). > I was curious about this option, and tried it. One issue I ran into was that the downstream logic had some "odd" windows to deal with because the window interval did not properly reflect its contents, which resulted in some downstream logic that wasn't as encapsulated as it should be. I therefore created a PTransform "DailyWindowsWithContext" that first does the contextual windowing ("ContextualCalendarDayWindow") and GBK. It then "re-windows" the elements + their context by filtering out the "context-only" groups, sets the timestamps of the remaining groups based on the max of the element timestamps, and then outputs them into a fixed daily window, followed by another GBK and flatten. This seems to work quite well with my set of unit tests, though I haven't used it extensively yet. If anyone is curious about the code (written in Kotlin), see here: https://gist.github.com/rocketraman/543f066813fc89590f23ff5dacf43f01 Feedback on this code would be more than welcome. Regards, Raman > > I am just writing this without coding, so I could certainly have missed > something or gotten it wrong. > > Kenn > > On Fri, Mar 26, 2021 at 1:47 PM Raman Gupta wrote: > >> I have a 7-day sliding calendar window, sliding by 1 day. The intent is >> to process only elements that fall into the last day of a window, but still >> have access to the elements from the preceding six days. >> >> I created a sliding calendar window function, and trigger it like this: >> >> AfterWatermark.pastEndOfWindow() >> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()) >> .withLateFirings(AfterPane.elementCountAtLeast(1)) >> >> Downstream of this pipeline I have a GBK and a DoFn that basically >> ignores elements until at least some of them are in the last day of >> the window. >> >> The above trigger works and the pipeline produces the expected output, >> but runs the GBK and downstream logic many more times than is necessary. >> >> Is there a way I can optimize the triggering here such that the early >> firings begin only when the watermark moves into the last day of the 7-day >> window? >> >> Thanks, >> Raman >> >>
Re: Triggering partway through a window
I experimented with `AfterProcessingTime.pastFirstElementInPane().plusDelayOf(6.days)` which is *almost* working, but delays elements on the last day depending on the element timestamps from the first day. I think I'd need something like `AfterProcessingTime.pastStartOfWindow().plusDelayOf(6.days)`, but I'm finding it difficult to understand how to create such a trigger, or if something like that is even possible. Regards, Raman On Fri, Mar 26, 2021 at 4:47 PM Raman Gupta wrote: > I have a 7-day sliding calendar window, sliding by 1 day. The intent is to > process only elements that fall into the last day of a window, but still > have access to the elements from the preceding six days. > > I created a sliding calendar window function, and trigger it like this: > > AfterWatermark.pastEndOfWindow() > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()) > .withLateFirings(AfterPane.elementCountAtLeast(1)) > > Downstream of this pipeline I have a GBK and a DoFn that basically ignores > elements until at least some of them are in the last day of the window. > > The above trigger works and the pipeline produces the expected output, but > runs the GBK and downstream logic many more times than is necessary. > > Is there a way I can optimize the triggering here such that the early > firings begin only when the watermark moves into the last day of the 7-day > window? > > Thanks, > Raman > >
Triggering partway through a window
I have a 7-day sliding calendar window, sliding by 1 day. The intent is to process only elements that fall into the last day of a window, but still have access to the elements from the preceding six days. I created a sliding calendar window function, and trigger it like this: AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()) .withLateFirings(AfterPane.elementCountAtLeast(1)) Downstream of this pipeline I have a GBK and a DoFn that basically ignores elements until at least some of them are in the last day of the window. The above trigger works and the pipeline produces the expected output, but runs the GBK and downstream logic many more times than is necessary. Is there a way I can optimize the triggering here such that the early firings begin only when the watermark moves into the last day of the 7-day window? Thanks, Raman
Re: Looping timer, global windows, and direct runner
Thanks guys for all the help. I think the late-passthrough solution should work for my use case. On Wed, Jan 13, 2021 at 10:27 AM Reza Ardeshir Rokni wrote: > Hi, > > Late data, in general, can be problematic for the looping timer pattern, > as well as other use cases, where the arrival of data in the @process > function creates a EventTime domain Timer. The solution you have, which > essentially passes it through, is a nice option, another solution would be > to deal with the late data in a branch upstream of the DoFn where timer > work is happening ( via a multi output). The former is good , if your > downstream transforms are not expecting specific behaviours from the > looping timer characteristics. The latter is often easier to use when you > want to write your downstream transforms without having to think about > dealing with late data. > > Cheers > > Reza > > On Wed, 13 Jan 2021 at 23:12, Jan Lukavský wrote: > >> I think there still could be problems in some corner cases. The problem >> is, that elements considered 'late' in timestamp combiner have different >> definition than what is marked as late in PaneInfo. So you can have a >> corner case, when PaneInfo would on ON_TIME, but the timestamp would still >> be shifted to end of window. This would probably not be too often, but it >> can happen. If it is fine for your use case, then this could work. >> >> Jan >> On 1/13/21 3:59 PM, Raman Gupta wrote: >> >> Hmm, I think I've found a simple solution... adding this to the beginning >> of my looping timer @ProcessElement function: >> >> // late elements don't need to affect our looping timer,// pass them through >> without modification// this is kind of a work-around for >> https://issues.apache.org/jira/browse/BEAM-2262// but I think makes sense in >> general for looping timers when// there is no need to trigger timers after >> the window is doneif (paneInfo.timing == PaneInfo.Timing.LATE) { >> receiver.output(element) >> return} >> >> At least all my unit tests are passing... is there any problem with this >> approach? >> >> Thanks, >> Raman >> >> >> On Wed, Jan 13, 2021 at 9:42 AM Raman Gupta >> wrote: >> >>> (Replying to Reza) Yes, I am using TestStream for my unit test. Other >>> replies below. >>> >>> On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský wrote: >>> >>>> Hi, >>>> >>>> yes, there is a possible non-determinism, that is related to the >>>> timestamp combiner. Timestamp combiners combine only elements, that are not >>>> 'late' ([1]), meaning that their timestamp is not preceding output >>>> watermark of the GBK. Looking at the pipeline code I suppose that could be >>>> the cause. >>>> >>> >>> Yes, the test stream in this test case does indeed send the element in >>> question "late". Here is the setup: >>> >>> val base = Instant.EPOCH + 6.hours >>> val xStream: TestStream = TestStream.create(coder) >>> .addElements(x["1"]) // this just initializes the looping timer >>> // advance watermark past end of window that would normally process x2 >>> .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda()) >>> .addElements(x["2"]) // now we see the element >>> .advanceWatermarkToInfinity() >>> >>> Here late element x["2"] has a timestamp of 1970-01-01T07:30:00.000Z and >>> the watermark at the time x["2"] is added is 1970-01-01T09:00:01.000Z. >>> >>> So I get your point that the timestamp combiner is not used for late >>> elements, but if late elements are singly emitted as in this pipeline, why >>> do any timestamp modification at all? I would expect them to arrive with >>> their original timestamp, not one changed from 1970-01-01T07:30:00.000Z >>> to 1970-01-01T07:34:59.999Z (this is the part that seems >>> non-deterministic). What is the logic / reason behind the pipeline setting >>> this element's timestamp to 1970-01-01T07:34:59.999Z? >>> >>> >>>> You can make the pipeline deterministic by using >>>> TimestampCombiner.END_OF_WINDOW (default). >>>> >>> >>> It's definitely not ideal for this use case, but I'll consider it. >>> >>> >>>> If you *need* to use the TimestampCombiner.EARLIEST, you can probably >>>> do that by tweaking the looping timer stateful dofn and fix timestamps >>>> t
Re: Looping timer, global windows, and direct runner
Hmm, I think I've found a simple solution... adding this to the beginning of my looping timer @ProcessElement function: // late elements don't need to affect our looping timer, // pass them through without modification // this is kind of a work-around for https://issues.apache.org/jira/browse/BEAM-2262 // but I think makes sense in general for looping timers when // there is no need to trigger timers after the window is done if (paneInfo.timing == PaneInfo.Timing.LATE) { receiver.output(element) return } At least all my unit tests are passing... is there any problem with this approach? Thanks, Raman On Wed, Jan 13, 2021 at 9:42 AM Raman Gupta wrote: > (Replying to Reza) Yes, I am using TestStream for my unit test. Other > replies below. > > On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský wrote: > >> Hi, >> >> yes, there is a possible non-determinism, that is related to the >> timestamp combiner. Timestamp combiners combine only elements, that are not >> 'late' ([1]), meaning that their timestamp is not preceding output >> watermark of the GBK. Looking at the pipeline code I suppose that could be >> the cause. >> > > Yes, the test stream in this test case does indeed send the element in > question "late". Here is the setup: > > val base = Instant.EPOCH + 6.hours > val xStream: TestStream = TestStream.create(coder) > .addElements(x["1"]) // this just initializes the looping timer > // advance watermark past end of window that would normally process x2 > .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda()) > .addElements(x["2"]) // now we see the element > .advanceWatermarkToInfinity() > > Here late element x["2"] has a timestamp of 1970-01-01T07:30:00.000Z and > the watermark at the time x["2"] is added is 1970-01-01T09:00:01.000Z. > > So I get your point that the timestamp combiner is not used for late > elements, but if late elements are singly emitted as in this pipeline, why > do any timestamp modification at all? I would expect them to arrive with > their original timestamp, not one changed from 1970-01-01T07:30:00.000Z > to 1970-01-01T07:34:59.999Z (this is the part that seems > non-deterministic). What is the logic / reason behind the pipeline setting > this element's timestamp to 1970-01-01T07:34:59.999Z? > > >> You can make the pipeline deterministic by using >> TimestampCombiner.END_OF_WINDOW (default). >> > > It's definitely not ideal for this use case, but I'll consider it. > > >> If you *need* to use the TimestampCombiner.EARLIEST, you can probably do >> that by tweaking the looping timer stateful dofn and fix timestamps there >> (using timer output timestamp). >> > > I had already tried that but the pipeline throws an error that the > timestamp emitted cannot be earlier than the current element timestamp. > > Thanks, > Raman > > > >> Jan >> >> [1] https://issues.apache.org/jira/browse/BEAM-2262 >> On 1/12/21 5:26 PM, Raman Gupta wrote: >> >> Your reply made me realize I removed the condition from my local copy of >> the looping timer that brings the timer forward if it encounters an earlier >> element later in the stream: >> >> if (currentTimerValue == null || currentTimerValue > >> nextTimerTimeBasedOnCurrentElement.getMillis()) { >> >> >> Restoring that condition fixes that issue. >> >> However, the reason I removed that condition in the first place was >> because it was making a unit test non-deterministic -- sometimes the >> element timestamps into the looping timer didn't seem to match the element >> timestamps according to the EARLIEST timestamp combiner defined, causing >> the timer to execute an additional time. >> >> The pipeline: >> >> input >> // withAllowedTimestampSkew is deprecated, but as of now, there is no >> replacement // https://issues.apache.org/jira/browse/BEAM-644 >> .apply("XTimestamps", WithTimestamps >> .of { it.enteredAt.asJoda() } >> .withAllowedTimestampSkew(Duration.INFINITE.asJoda()) >> ) >> .apply("FixedTickWindows", >> Window.into(FixedWindows.of(5.minutes.asJoda())) >> .triggering( >> AfterWatermark.pastEndOfWindow() >> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()) >> .withLateFirings(AfterPane.elementCountAtLeast(1)) >> ) >> .withAllowedLateness(3.days.asJoda(), >> Window.ClosingBehavior.FIRE_IF_NON_EMPTY) >> .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_A
Re: Looping timer, global windows, and direct runner
(Replying to Reza) Yes, I am using TestStream for my unit test. Other replies below. On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský wrote: > Hi, > > yes, there is a possible non-determinism, that is related to the timestamp > combiner. Timestamp combiners combine only elements, that are not 'late' > ([1]), meaning that their timestamp is not preceding output watermark of > the GBK. Looking at the pipeline code I suppose that could be the cause. > Yes, the test stream in this test case does indeed send the element in question "late". Here is the setup: val base = Instant.EPOCH + 6.hours val xStream: TestStream = TestStream.create(coder) .addElements(x["1"]) // this just initializes the looping timer // advance watermark past end of window that would normally process x2 .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda()) .addElements(x["2"]) // now we see the element .advanceWatermarkToInfinity() Here late element x["2"] has a timestamp of 1970-01-01T07:30:00.000Z and the watermark at the time x["2"] is added is 1970-01-01T09:00:01.000Z. So I get your point that the timestamp combiner is not used for late elements, but if late elements are singly emitted as in this pipeline, why do any timestamp modification at all? I would expect them to arrive with their original timestamp, not one changed from 1970-01-01T07:30:00.000Z to 1970-01-01T07:34:59.999Z (this is the part that seems non-deterministic). What is the logic / reason behind the pipeline setting this element's timestamp to 1970-01-01T07:34:59.999Z? > You can make the pipeline deterministic by using > TimestampCombiner.END_OF_WINDOW (default). > It's definitely not ideal for this use case, but I'll consider it. > If you *need* to use the TimestampCombiner.EARLIEST, you can probably do > that by tweaking the looping timer stateful dofn and fix timestamps there > (using timer output timestamp). > I had already tried that but the pipeline throws an error that the timestamp emitted cannot be earlier than the current element timestamp. Thanks, Raman > Jan > > [1] https://issues.apache.org/jira/browse/BEAM-2262 > On 1/12/21 5:26 PM, Raman Gupta wrote: > > Your reply made me realize I removed the condition from my local copy of > the looping timer that brings the timer forward if it encounters an earlier > element later in the stream: > > if (currentTimerValue == null || currentTimerValue > > nextTimerTimeBasedOnCurrentElement.getMillis()) { > > > Restoring that condition fixes that issue. > > However, the reason I removed that condition in the first place was > because it was making a unit test non-deterministic -- sometimes the > element timestamps into the looping timer didn't seem to match the element > timestamps according to the EARLIEST timestamp combiner defined, causing > the timer to execute an additional time. > > The pipeline: > > input > // withAllowedTimestampSkew is deprecated, but as of now, there is no > replacement // https://issues.apache.org/jira/browse/BEAM-644 > .apply("XTimestamps", WithTimestamps > .of { it.enteredAt.asJoda() } > .withAllowedTimestampSkew(Duration.INFINITE.asJoda()) > ) > .apply("FixedTickWindows", > Window.into(FixedWindows.of(5.minutes.asJoda())) > .triggering( > AfterWatermark.pastEndOfWindow() > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()) > .withLateFirings(AfterPane.elementCountAtLeast(1)) > ) > .withAllowedLateness(3.days.asJoda(), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS) > .discardingFiredPanes() > .withTimestampCombiner(TimestampCombiner.EARLIEST) > ) > .apply("KeyByUser", WithKeys.of { it.userId }) > .apply("GroupByUser", GroupByKey.create()) > .apply("GlobalWindowsLoopingStatefulTimer",Window.into Iterable>>(GlobalWindows()) > > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) > .discardingFiredPanes() > .withTimestampCombiner(TimestampCombiner.EARLIEST) > ) > .apply("LoopingStatefulTimer",ParDo.of(LoopingStatefulTimer(5.minutes, > (options.timerTimeoutDays ?: 30).days))) > > > The looping timer receives an @Timestamp value in the process function of: > > 1970-01-01T07:34:59.999Z > > but the earliest timestamp of the (single) element in the elements > iterable is: > > 1970-01-01T07:30:00.000Z > > I would have thought given my timestamp combiners on my windows that the > timestamp should have been 07:30:00.000Z. Is there something wrong in my > pipeline
Re: Looping timer, global windows, and direct runner
Your reply made me realize I removed the condition from my local copy of the looping timer that brings the timer forward if it encounters an earlier element later in the stream: if (currentTimerValue == null || currentTimerValue > nextTimerTimeBasedOnCurrentElement.getMillis()) { Restoring that condition fixes that issue. However, the reason I removed that condition in the first place was because it was making a unit test non-deterministic -- sometimes the element timestamps into the looping timer didn't seem to match the element timestamps according to the EARLIEST timestamp combiner defined, causing the timer to execute an additional time. The pipeline: input // withAllowedTimestampSkew is deprecated, but as of now, there is no replacement // https://issues.apache.org/jira/browse/BEAM-644 .apply("XTimestamps", WithTimestamps .of { it.enteredAt.asJoda() } .withAllowedTimestampSkew(Duration.INFINITE.asJoda()) ) .apply("FixedTickWindows", Window.into(FixedWindows.of(5.minutes.asJoda())) .triggering( AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()) .withLateFirings(AfterPane.elementCountAtLeast(1)) ) .withAllowedLateness(3.days.asJoda(), Window.ClosingBehavior.FIRE_IF_NON_EMPTY) .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS) .discardingFiredPanes() .withTimestampCombiner(TimestampCombiner.EARLIEST) ) .apply("KeyByUser", WithKeys.of { it.userId }) .apply("GroupByUser", GroupByKey.create()) .apply("GlobalWindowsLoopingStatefulTimer", Window.into>>(GlobalWindows()) .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) .discardingFiredPanes() .withTimestampCombiner(TimestampCombiner.EARLIEST) ) .apply("LoopingStatefulTimer", ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays ?: 30).days))) The looping timer receives an @Timestamp value in the process function of: 1970-01-01T07:34:59.999Z but the earliest timestamp of the (single) element in the elements iterable is: 1970-01-01T07:30:00.000Z I would have thought given my timestamp combiners on my windows that the timestamp should have been 07:30:00.000Z. Is there something wrong in my pipeline that is causing this non-deterministic behavior? Thanks, Raman On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský wrote: > Hi Raman, > > can you share the details of the pipeline? How exactly are you using the > looping timer? Timer as described in the linked blog post should be > deterministic even when the order of the input elements is undefined. > Does you logic depend on element ordering? > > Jan > > On 1/12/21 3:18 PM, Raman Gupta wrote: > > Hello, I am building and testing a pipeline with the direct runner. > > The pipeline includes a looping timer - > > https://beam.apache.org/blog/looping-timers/. > > > > For now, I am using JdbcIO to obtain my input data, though when put > > into production the pipeline will use PubSubIO. > > > > I am finding that the looping timer begins producing outputs at a > > random event time, which makes some sense given the randomization of > > inputs in the direct runner. However, this makes the results of > > executing my pipeline with the direct runner completely > non-deterministic. > > > > So: > > > > 1) Is there a way to turn off this non-deterministic behavior, but > > just for the GlobalWindow / LoopingTimer? > > > > 2) Perhaps alternatively, is there a way to "initialize" the looping > > timer when the pipeline starts, rather than when it first sees an > > element? Perhaps a side input? > > > > 3) Am I right in assuming that when I move this pipeline to pub/sub io > > and operate it in streaming mode, this issue will go away? > > > > Thanks! > > Raman > > >
Looping timer, global windows, and direct runner
Hello, I am building and testing a pipeline with the direct runner. The pipeline includes a looping timer - https://beam.apache.org/blog/looping-timers/. For now, I am using JdbcIO to obtain my input data, though when put into production the pipeline will use PubSubIO. I am finding that the looping timer begins producing outputs at a random event time, which makes some sense given the randomization of inputs in the direct runner. However, this makes the results of executing my pipeline with the direct runner completely non-deterministic. So: 1) Is there a way to turn off this non-deterministic behavior, but just for the GlobalWindow / LoopingTimer? 2) Perhaps alternatively, is there a way to "initialize" the looping timer when the pipeline starts, rather than when it first sees an element? Perhaps a side input? 3) Am I right in assuming that when I move this pipeline to pub/sub io and operate it in streaming mode, this issue will go away? Thanks! Raman
Mocking business logic injected into DoFns
Hello, I have some DoFns which have properties containing business logic classes that are injected at pipeline construction time. When testing these pipelines, I would like to mock this business logic, as it is quite complex, and easier to test independently of the pipeline. However, my mocking framework does not generate serializable mocks, and my test therefore fails with a NotSerializable exception. Is there a way to relax this requirement for unit tests?