Re: Beam/Dataflow pipeline backfill via JDBC

2021-05-12 Thread Raman Gupta
The pipeline works fine with the Direct Runner. The issue appears to be
specific to streaming mode on Dataflow. I've updated my pipeline to use
Pub/Sub as an input instead, and digging into the Dataflow console, it
looks like execution of a particular GroupByKey is moving extremely slowly
-- the watermark for the prior step is caught up to real time, but the
GroupByKey step data watermark is currently June 4, 2020 and takes about 10
minutes to advance a single day, which is ridiculously slow. In batch mode,
the whole backlog is processed in about 30 minutes.

It is actually a reasonably large and complex pipeline, so I'm not actually
sure where I would even start with code snippets. The particular GroupByKey
that seems to be running extremely slowly is "FixedTickGroupByTeam" and is
in a part of the pipeline that looks like this -- it really should be
processing only a few events per day with an event time of Jun 4th 2020, so
something is definitely not right:

val scores = timestampedCheckins
  .apply(
"FixedTickWindows",
Window.into(FixedWindows.of(5.minutes.asJoda()))
  // NOTE that we use the default timestamp combiner (end of
window) to avoid
  // https://issues.apache.org/jira/browse/BEAM-2262
  .triggering(
AfterWatermark.pastEndOfWindow()
  .withLateFirings(AfterPane.elementCountAtLeast(1))
  )
  .withAllowedLateness(3.days.asJoda(),
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
  .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
  .withTimestampCombiner(TimestampCombiner.LATEST)
  .discardingFiredPanes()
  )
  .apply("FixedTickKeyByTeam", WithKeys.of { it.teamId })
  .apply("FixedTickGroupByTeam", GroupByKey.create())
  .apply("GlobalWindowsLoopingStatefulTimer",
Window.into(GlobalWindows())
  )
  .apply("LoopingStatefulTimer",
ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays
?: 30).days))
  )
  // this window and subsequent group by and flatten combines the
empty iterable timer output with any actual check-ins
  .apply("FixedTickWindowsPostTimer",
Window.into>>(FixedWindows.of(5.minutes.asJoda()))
  .applyWindowingOptions()
  )
  .apply("FixedTickGroupByTeamPostTimer", GroupByKey.create())
  .apply("FixedTickFlattenPostTimer", flattenValues())
  // convert to an explicit PeriodContent type representing either an
empty period or a period with check-ins
  // this allows us to carry forward the timestamp of an empty period,
without it being flattened into a single empty
  //.apply("MapToPeriodContent", ParDo.of(MapToPeriodContentFn()))
  .apply("CheckinTimingScoreFn",
ParDo.of(CheckinTimingScoreFn(scoreCalculationServiceBuilder,
checkinStateView)).withSideInputs(checkinStateView)
  )






On Wed, May 12, 2021 at 12:43 PM Kenneth Knowles  wrote:

> Can you share some more details, such as code? We may identify something
> that relies upon assumptions from batch execution style.
>
> Also notably the Java DirectRunner does not have separate batch/streaming
> mode. It always executes in a "streaming" sort of way. It is also simpler
> in some ways so if you can reproduce it on the DirectRunner that might help.
>
> Kenn
>
> On Tue, May 11, 2021 at 3:41 PM Raman Gupta  wrote:
>
>> I have a Dataflow pipeline that reads data from JDBC and Pub/Sub. My
>> ideal pipeline backfills its state and output from historical data via the
>> JDBC input, and then continues processing new elements arriving via
>> pub/sub. Conceptually, this seems easy to do with a filter on each source
>> before/after some specific cutoff instant.
>>
>> However, when I add pub/sub into the pipeline, it runs in streaming mode,
>> and the pipeline does not produce the expected results -- all of the
>> results that would be produced based on looping timers seem to be missing.
>>
>> I thought this might be related to the post-inputs Flatten, but I've
>> taken pub/sub out of the equation, and run the same exact JDBC-based
>> pipeline in batch vs streaming mode, and the JDBC-only pipeline in
>> streaming mode produces the same partial results.
>>
>> What could be happening?
>>
>> Regards,
>> Raman
>>
>>


Beam/Dataflow pipeline backfill via JDBC

2021-05-11 Thread Raman Gupta
I have a Dataflow pipeline that reads data from JDBC and Pub/Sub. My ideal
pipeline backfills its state and output from historical data via the JDBC
input, and then continues processing new elements arriving via pub/sub.
Conceptually, this seems easy to do with a filter on each source
before/after some specific cutoff instant.

However, when I add pub/sub into the pipeline, it runs in streaming mode,
and the pipeline does not produce the expected results -- all of the
results that would be produced based on looping timers seem to be missing.

I thought this might be related to the post-inputs Flatten, but I've taken
pub/sub out of the equation, and run the same exact JDBC-based pipeline in
batch vs streaming mode, and the JDBC-only pipeline in streaming mode
produces the same partial results.

What could be happening?

Regards,
Raman


Running averages -- but global window state might not be enough

2021-04-20 Thread Raman Gupta
I have a running average problem. As I understand it, the traditional Beam
solution is state in a global window, but I'm not quite sure how to
approach it for my use case, which is a bit more complex.

I have a "score" output every 5 minutes based on a timer, up to a maximum
of 1 hour after some time, depending on the arrival times of a few input
events per day.

The output of this initial part of the pipeline is 1) versioned, so when
running the pipeline in batch mode, or dealing with up to 3-day late
inputs, the score in the output system is continuously updated (and outputs
from out-of-order inputs are ignored) and 2) aggregated into a daily score
along with inputs coming from other pipeline branches, which is also
continuously updated part-way through the day with early and late triggers.

Now, I need to calculate the running average of the individual scores
output every 5 minutes multiple times per day, and factor those into the
overall aggregated daily score. The running average should consider only
the highest version score for each day on which there are scores.

I don't see how I can do this with global windows without keeping a full
history of the latest score and version on every previous day, which will
grow without bound. Or am I missing something?

Thanks,
Raman


Re: Side windows and late inputs

2021-04-10 Thread Raman Gupta
I'm also having the opposite problem -- my unit test moves the watermark to
just before a timestamped input. I was expecting the DoFn to execute for
all windows up to that watermark, seeing nothing in the side input.

However, that isn't the case -- the test shows that the DoFn side input
*before* the watermark, contains the input timestamped *after* the
watermark. This makes no sense to me.

Regards,
Raman


On Sat, Apr 10, 2021 at 2:46 AM Raman Gupta  wrote:

> I have a topology that looks like this:
>
> input -> 1hr window -> view
>   |
>   -> 5m window -> dofn(view)
>
> IOW -- an input is windowed into both 1 hour windows, as well as 5 minute
> windows. The 1 hour window data is collected into a view.
>
> Meanwhile, the input is also windowed with 5 minute windows, there is some
> processing, and then finally a DoFn that uses the view as a side input.
>
> Both windows allow late data, and fire immediately on a late element. My
> expectation would have been that any processing occurring in the DoFn after
> the processing time of the late element would see the late element in the
> view. However, that doesn't seem to be the case. Why?
>
> Is there another approach I can use that doesn't suffer from this problem?
>
> Regards,
> Raman Gupta
>
>


Side windows and late inputs

2021-04-09 Thread Raman Gupta
I have a topology that looks like this:

input -> 1hr window -> view
  |
  -> 5m window -> dofn(view)

IOW -- an input is windowed into both 1 hour windows, as well as 5 minute
windows. The 1 hour window data is collected into a view.

Meanwhile, the input is also windowed with 5 minute windows, there is some
processing, and then finally a DoFn that uses the view as a side input.

Both windows allow late data, and fire immediately on a late element. My
expectation would have been that any processing occurring in the DoFn after
the processing time of the late element would see the late element in the
view. However, that doesn't seem to be the case. Why?

Is there another approach I can use that doesn't suffer from this problem?

Regards,
Raman Gupta


Re: Triggering partway through a window

2021-04-06 Thread Raman Gupta
On Mon, Mar 29, 2021 at 1:17 PM Kenneth Knowles  wrote:

> That's a neat example!
>
> The trigger you have there will emit a ton of output. What is your
> accumulation mode? I assume it must be accumulatingFiredPanes() otherwise
> you would not actually have access to the prior 6 days of input.
>

Yup, accumulating.


>
> The only trigger that is based on completeness of data is the
> AfterWatermark.pastEndOfWindow() trigger, so you have to use that to
> capture the 6 days of data:
>
> prior6days = input.apply(Window.into(<6 day windows sliding one
> day>).triggering(AfterWatermark.pastEndOfWindow())
>
> Now if you GBK this collection, each group will have a timestamp that is
> the end of the 6 day period. You can use ParDo with outputWithTimestamp to
> move the timestamp up to any timestamp in the following day, yielding a
> PCollection of 6 day grouping of data with a timestamp in the last day of
> the 7. If the 6 days of data is large you may hit size limits (either hard
> limits or perf problems) and have to do something fancier.
>
> Flatten this with the input PCollection and window into FixedWindows( day>) and trigger however you like, again with accumulatingFiredPanes().
> There is no guarantee that the 6 days of past data arrives prior to
> elements in the last day. In fact since it will be delayed by an extra
> shuffle you would expect it to often show up later. So this is a heuristic
> approach equivalent to what it sounds like you are already doing that
> should lower the cost.
>

Ah interesting. Yes, this would likely have worked for me.


>
> If you want a guarantee that the 6 day buffer arrives prior to the other
> elements you will need to do something else. You could write a WindowFn
> that assigned all 7 days of data to a window that only spanned the first 6
> days, then trigger at end of window plus allowing late data (no early
> firings). Then every firing would be guaranteed by the watermark to have
> the first 6 days of data plus whatever else has shown up. (I assume part of
> your spec is that you do want data to be processed as it arrives, versus
> waiting until the end of the 7 day window).
>

I was curious about this option, and tried it. One issue I ran into was
that the downstream logic had some "odd" windows to deal with because the
window interval did not properly reflect its contents, which resulted in
some downstream logic that wasn't as encapsulated as it should be.

I therefore created a PTransform "DailyWindowsWithContext" that first does
the contextual windowing ("ContextualCalendarDayWindow") and GBK. It then
"re-windows" the elements + their context by filtering out the
"context-only" groups, sets the timestamps of the remaining groups based on
the max of the element timestamps, and then outputs them into a fixed
daily window, followed by another GBK and flatten.

This seems to work quite well with my set of unit tests, though I haven't
used it extensively yet. If anyone is curious about the code (written in
Kotlin), see here:

https://gist.github.com/rocketraman/543f066813fc89590f23ff5dacf43f01

Feedback on this code would be more than welcome.

Regards,
Raman



>
> I am just writing this without coding, so I could certainly have missed
> something or gotten it wrong.
>
> Kenn
>
> On Fri, Mar 26, 2021 at 1:47 PM Raman Gupta  wrote:
>
>> I have a 7-day sliding calendar window, sliding by 1 day. The intent is
>> to process only elements that fall into the last day of a window, but still
>> have access to the elements from the preceding six days.
>>
>> I created a sliding calendar window function, and trigger it like this:
>>
>> AfterWatermark.pastEndOfWindow()
>>   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>>   .withLateFirings(AfterPane.elementCountAtLeast(1))
>>
>> Downstream of this pipeline I have a GBK and a DoFn that basically
>> ignores elements until at least some of them are in the last day of
>> the window.
>>
>> The above trigger works and the pipeline produces the expected output,
>> but runs the GBK and downstream logic many more times than is necessary.
>>
>> Is there a way I can optimize the triggering here such that the early
>> firings begin only when the watermark moves into the last day of the 7-day
>> window?
>>
>> Thanks,
>> Raman
>>
>>


Re: Triggering partway through a window

2021-03-26 Thread Raman Gupta
I experimented with
`AfterProcessingTime.pastFirstElementInPane().plusDelayOf(6.days)` which is
*almost* working, but delays elements on the last day depending on the
element timestamps from the first day.

I think I'd need something like
`AfterProcessingTime.pastStartOfWindow().plusDelayOf(6.days)`, but I'm
finding it difficult to understand how to create such a trigger, or if
something like that is even possible.

Regards,
Raman


On Fri, Mar 26, 2021 at 4:47 PM Raman Gupta  wrote:

> I have a 7-day sliding calendar window, sliding by 1 day. The intent is to
> process only elements that fall into the last day of a window, but still
> have access to the elements from the preceding six days.
>
> I created a sliding calendar window function, and trigger it like this:
>
> AfterWatermark.pastEndOfWindow()
>   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>   .withLateFirings(AfterPane.elementCountAtLeast(1))
>
> Downstream of this pipeline I have a GBK and a DoFn that basically ignores
> elements until at least some of them are in the last day of the window.
>
> The above trigger works and the pipeline produces the expected output, but
> runs the GBK and downstream logic many more times than is necessary.
>
> Is there a way I can optimize the triggering here such that the early
> firings begin only when the watermark moves into the last day of the 7-day
> window?
>
> Thanks,
> Raman
>
>


Triggering partway through a window

2021-03-26 Thread Raman Gupta
I have a 7-day sliding calendar window, sliding by 1 day. The intent is to
process only elements that fall into the last day of a window, but still
have access to the elements from the preceding six days.

I created a sliding calendar window function, and trigger it like this:

AfterWatermark.pastEndOfWindow()
  .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
  .withLateFirings(AfterPane.elementCountAtLeast(1))

Downstream of this pipeline I have a GBK and a DoFn that basically ignores
elements until at least some of them are in the last day of the window.

The above trigger works and the pipeline produces the expected output, but
runs the GBK and downstream logic many more times than is necessary.

Is there a way I can optimize the triggering here such that the early
firings begin only when the watermark moves into the last day of the 7-day
window?

Thanks,
Raman


Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Raman Gupta
Thanks guys for all the help. I think the late-passthrough solution should
work for my use case.

On Wed, Jan 13, 2021 at 10:27 AM Reza Ardeshir Rokni 
wrote:

> Hi,
>
> Late data, in general, can be problematic for the looping timer pattern,
> as well as other use cases, where the arrival of data in the @process
> function creates a EventTime domain Timer. The solution you have, which
> essentially passes it through, is a nice option, another solution would be
> to deal with the late data in a branch upstream of the DoFn where timer
> work is happening ( via a multi output). The former is good , if your
> downstream transforms are not expecting specific behaviours from the
> looping timer characteristics. The latter is often easier to use when you
> want to write your downstream transforms without having to think about
> dealing with late data.
>
> Cheers
>
> Reza
>
> On Wed, 13 Jan 2021 at 23:12, Jan Lukavský  wrote:
>
>> I think there still could be problems in some corner cases. The problem
>> is, that elements considered 'late' in timestamp combiner have different
>> definition than what is marked as late in PaneInfo. So you can have a
>> corner case, when PaneInfo would on ON_TIME, but the timestamp would still
>> be shifted to end of window. This would probably not be too often, but it
>> can happen. If it is fine for your use case, then this could work.
>>
>> Jan
>> On 1/13/21 3:59 PM, Raman Gupta wrote:
>>
>> Hmm, I think I've found a simple solution... adding this to the beginning
>> of my looping timer @ProcessElement function:
>>
>> // late elements don't need to affect our looping timer,// pass them through 
>> without modification// this is kind of a work-around for 
>> https://issues.apache.org/jira/browse/BEAM-2262// but I think makes sense in 
>> general for looping timers when// there is no need to trigger timers after 
>> the window is doneif (paneInfo.timing == PaneInfo.Timing.LATE) {
>>   receiver.output(element)
>>   return}
>>
>> At least all my unit tests are passing... is there any problem with this
>> approach?
>>
>> Thanks,
>> Raman
>>
>>
>> On Wed, Jan 13, 2021 at 9:42 AM Raman Gupta 
>> wrote:
>>
>>> (Replying to Reza) Yes, I am using TestStream for my unit test. Other
>>> replies below.
>>>
>>> On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský  wrote:
>>>
>>>> Hi,
>>>>
>>>> yes, there is a possible non-determinism, that is related to the
>>>> timestamp combiner. Timestamp combiners combine only elements, that are not
>>>> 'late' ([1]), meaning that their timestamp is not preceding output
>>>> watermark of the GBK. Looking at the pipeline code I suppose that could be
>>>> the cause.
>>>>
>>>
>>> Yes, the test stream in this test case does indeed send the element in
>>> question "late". Here is the setup:
>>>
>>> val base = Instant.EPOCH + 6.hours
>>> val xStream: TestStream = TestStream.create(coder)
>>>   .addElements(x["1"]) // this just initializes the looping timer
>>>   // advance watermark past end of window that would normally process x2
>>>   .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda())
>>>   .addElements(x["2"]) // now we see the element
>>>   .advanceWatermarkToInfinity()
>>>
>>> Here late element x["2"] has a timestamp of 1970-01-01T07:30:00.000Z and
>>> the watermark at the time x["2"] is added is 1970-01-01T09:00:01.000Z.
>>>
>>> So I get your point that the timestamp combiner is not used for late
>>> elements, but if late elements are singly emitted as in this pipeline, why
>>> do any timestamp modification at all? I would expect them to arrive with
>>> their original timestamp, not one changed from 1970-01-01T07:30:00.000Z
>>> to 1970-01-01T07:34:59.999Z (this is the part that seems
>>> non-deterministic). What is the logic / reason behind the pipeline setting
>>> this element's timestamp to 1970-01-01T07:34:59.999Z?
>>>
>>>
>>>> You can make the pipeline deterministic by using
>>>> TimestampCombiner.END_OF_WINDOW (default).
>>>>
>>>
>>> It's definitely not ideal for this use case, but I'll consider it.
>>>
>>>
>>>> If you *need* to use the TimestampCombiner.EARLIEST, you can probably
>>>> do that by tweaking the looping timer stateful dofn and fix timestamps
>>>> t

Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Raman Gupta
Hmm, I think I've found a simple solution... adding this to the beginning
of my looping timer @ProcessElement function:

// late elements don't need to affect our looping timer,
// pass them through without modification
// this is kind of a work-around for
https://issues.apache.org/jira/browse/BEAM-2262
// but I think makes sense in general for looping timers when
// there is no need to trigger timers after the window is done
if (paneInfo.timing == PaneInfo.Timing.LATE) {
  receiver.output(element)
  return
}

At least all my unit tests are passing... is there any problem with this
approach?

Thanks,
Raman


On Wed, Jan 13, 2021 at 9:42 AM Raman Gupta  wrote:

> (Replying to Reza) Yes, I am using TestStream for my unit test. Other
> replies below.
>
> On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský  wrote:
>
>> Hi,
>>
>> yes, there is a possible non-determinism, that is related to the
>> timestamp combiner. Timestamp combiners combine only elements, that are not
>> 'late' ([1]), meaning that their timestamp is not preceding output
>> watermark of the GBK. Looking at the pipeline code I suppose that could be
>> the cause.
>>
>
> Yes, the test stream in this test case does indeed send the element in
> question "late". Here is the setup:
>
> val base = Instant.EPOCH + 6.hours
> val xStream: TestStream = TestStream.create(coder)
>   .addElements(x["1"]) // this just initializes the looping timer
>   // advance watermark past end of window that would normally process x2
>   .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda())
>   .addElements(x["2"]) // now we see the element
>   .advanceWatermarkToInfinity()
>
> Here late element x["2"] has a timestamp of 1970-01-01T07:30:00.000Z and
> the watermark at the time x["2"] is added is 1970-01-01T09:00:01.000Z.
>
> So I get your point that the timestamp combiner is not used for late
> elements, but if late elements are singly emitted as in this pipeline, why
> do any timestamp modification at all? I would expect them to arrive with
> their original timestamp, not one changed from 1970-01-01T07:30:00.000Z
> to 1970-01-01T07:34:59.999Z (this is the part that seems
> non-deterministic). What is the logic / reason behind the pipeline setting
> this element's timestamp to 1970-01-01T07:34:59.999Z?
>
>
>> You can make the pipeline deterministic by using
>> TimestampCombiner.END_OF_WINDOW (default).
>>
>
> It's definitely not ideal for this use case, but I'll consider it.
>
>
>> If you *need* to use the TimestampCombiner.EARLIEST, you can probably do
>> that by tweaking the looping timer stateful dofn and fix timestamps there
>> (using timer output timestamp).
>>
>
> I had already tried that but the pipeline throws an error that the
> timestamp emitted cannot be earlier than the current element timestamp.
>
> Thanks,
> Raman
>
>
>
>>   Jan
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-2262
>> On 1/12/21 5:26 PM, Raman Gupta wrote:
>>
>> Your reply made me realize I removed the condition from my local copy of
>> the looping timer that brings the timer forward if it encounters an earlier
>> element later in the stream:
>>
>> if (currentTimerValue == null || currentTimerValue >
>> nextTimerTimeBasedOnCurrentElement.getMillis()) {
>>
>>
>> Restoring that condition fixes that issue.
>>
>> However, the reason I removed that condition in the first place was
>> because it was making a unit test non-deterministic -- sometimes the
>> element timestamps into the looping timer didn't seem to match the element
>> timestamps according to the EARLIEST timestamp combiner defined, causing
>> the timer to execute an additional time.
>>
>> The pipeline:
>>
>> input
>>   // withAllowedTimestampSkew is deprecated, but as of now, there is no 
>> replacement  // https://issues.apache.org/jira/browse/BEAM-644  
>> .apply("XTimestamps", WithTimestamps
>> .of { it.enteredAt.asJoda() }
>> .withAllowedTimestampSkew(Duration.INFINITE.asJoda())
>>   )
>>   .apply("FixedTickWindows",
>> Window.into(FixedWindows.of(5.minutes.asJoda()))
>>   .triggering(
>> AfterWatermark.pastEndOfWindow()
>>   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>>   .withLateFirings(AfterPane.elementCountAtLeast(1))
>>   )
>>   .withAllowedLateness(3.days.asJoda(), 
>> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>>   .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_A

Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Raman Gupta
(Replying to Reza) Yes, I am using TestStream for my unit test. Other
replies below.

On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský  wrote:

> Hi,
>
> yes, there is a possible non-determinism, that is related to the timestamp
> combiner. Timestamp combiners combine only elements, that are not 'late'
> ([1]), meaning that their timestamp is not preceding output watermark of
> the GBK. Looking at the pipeline code I suppose that could be the cause.
>

Yes, the test stream in this test case does indeed send the element in
question "late". Here is the setup:

val base = Instant.EPOCH + 6.hours
val xStream: TestStream = TestStream.create(coder)
  .addElements(x["1"]) // this just initializes the looping timer
  // advance watermark past end of window that would normally process x2
  .advanceWatermarkTo((base + 3.hours + 1.minutes).asJoda())
  .addElements(x["2"]) // now we see the element
  .advanceWatermarkToInfinity()

Here late element x["2"] has a timestamp of 1970-01-01T07:30:00.000Z and
the watermark at the time x["2"] is added is 1970-01-01T09:00:01.000Z.

So I get your point that the timestamp combiner is not used for late
elements, but if late elements are singly emitted as in this pipeline, why
do any timestamp modification at all? I would expect them to arrive with
their original timestamp, not one changed from 1970-01-01T07:30:00.000Z
to 1970-01-01T07:34:59.999Z (this is the part that seems
non-deterministic). What is the logic / reason behind the pipeline setting
this element's timestamp to 1970-01-01T07:34:59.999Z?


> You can make the pipeline deterministic by using
> TimestampCombiner.END_OF_WINDOW (default).
>

It's definitely not ideal for this use case, but I'll consider it.


> If you *need* to use the TimestampCombiner.EARLIEST, you can probably do
> that by tweaking the looping timer stateful dofn and fix timestamps there
> (using timer output timestamp).
>

I had already tried that but the pipeline throws an error that the
timestamp emitted cannot be earlier than the current element timestamp.

Thanks,
Raman



>   Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-2262
> On 1/12/21 5:26 PM, Raman Gupta wrote:
>
> Your reply made me realize I removed the condition from my local copy of
> the looping timer that brings the timer forward if it encounters an earlier
> element later in the stream:
>
> if (currentTimerValue == null || currentTimerValue >
> nextTimerTimeBasedOnCurrentElement.getMillis()) {
>
>
> Restoring that condition fixes that issue.
>
> However, the reason I removed that condition in the first place was
> because it was making a unit test non-deterministic -- sometimes the
> element timestamps into the looping timer didn't seem to match the element
> timestamps according to the EARLIEST timestamp combiner defined, causing
> the timer to execute an additional time.
>
> The pipeline:
>
> input
>   // withAllowedTimestampSkew is deprecated, but as of now, there is no 
> replacement  // https://issues.apache.org/jira/browse/BEAM-644  
> .apply("XTimestamps", WithTimestamps
> .of { it.enteredAt.asJoda() }
> .withAllowedTimestampSkew(Duration.INFINITE.asJoda())
>   )
>   .apply("FixedTickWindows",
> Window.into(FixedWindows.of(5.minutes.asJoda()))
>   .triggering(
> AfterWatermark.pastEndOfWindow()
>   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>   .withLateFirings(AfterPane.elementCountAtLeast(1))
>   )
>   .withAllowedLateness(3.days.asJoda(), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>   .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
>   .discardingFiredPanes()
>   .withTimestampCombiner(TimestampCombiner.EARLIEST)
>   )
>   .apply("KeyByUser", WithKeys.of { it.userId })
>   .apply("GroupByUser", GroupByKey.create())
>   .apply("GlobalWindowsLoopingStatefulTimer",Window.into Iterable>>(GlobalWindows())
>   
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>   .discardingFiredPanes()
>   .withTimestampCombiner(TimestampCombiner.EARLIEST)
>   )
>   .apply("LoopingStatefulTimer",ParDo.of(LoopingStatefulTimer(5.minutes, 
> (options.timerTimeoutDays ?: 30).days)))
>
>
> The looping timer receives an @Timestamp value in the process function of:
>
> 1970-01-01T07:34:59.999Z
>
> but the earliest timestamp of the (single) element in the elements
> iterable is:
>
> 1970-01-01T07:30:00.000Z
>
> I would have thought given my timestamp combiners on my windows that the
> timestamp should have been 07:30:00.000Z. Is there something wrong in my
> pipeline

Re: Looping timer, global windows, and direct runner

2021-01-12 Thread Raman Gupta
Your reply made me realize I removed the condition from my local copy of
the looping timer that brings the timer forward if it encounters an earlier
element later in the stream:

if (currentTimerValue == null || currentTimerValue >
nextTimerTimeBasedOnCurrentElement.getMillis()) {


Restoring that condition fixes that issue.

However, the reason I removed that condition in the first place was because
it was making a unit test non-deterministic -- sometimes the element
timestamps into the looping timer didn't seem to match the element
timestamps according to the EARLIEST timestamp combiner defined, causing
the timer to execute an additional time.

The pipeline:

input
  // withAllowedTimestampSkew is deprecated, but as of now, there is
no replacement
  // https://issues.apache.org/jira/browse/BEAM-644
  .apply("XTimestamps", WithTimestamps
.of { it.enteredAt.asJoda() }
.withAllowedTimestampSkew(Duration.INFINITE.asJoda())
  )
  .apply("FixedTickWindows",
Window.into(FixedWindows.of(5.minutes.asJoda()))
  .triggering(
AfterWatermark.pastEndOfWindow()
  .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
  .withLateFirings(AfterPane.elementCountAtLeast(1))
  )
  .withAllowedLateness(3.days.asJoda(),
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
  .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
  .discardingFiredPanes()
  .withTimestampCombiner(TimestampCombiner.EARLIEST)
  )
  .apply("KeyByUser", WithKeys.of { it.userId })
  .apply("GroupByUser", GroupByKey.create())
  .apply("GlobalWindowsLoopingStatefulTimer",
Window.into>>(GlobalWindows())
  
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
  .discardingFiredPanes()
  .withTimestampCombiner(TimestampCombiner.EARLIEST)
  )
  .apply("LoopingStatefulTimer",
ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays
?: 30).days)))


The looping timer receives an @Timestamp value in the process function of:

1970-01-01T07:34:59.999Z

but the earliest timestamp of the (single) element in the elements iterable
is:

1970-01-01T07:30:00.000Z

I would have thought given my timestamp combiners on my windows that the
timestamp should have been 07:30:00.000Z. Is there something wrong in my
pipeline that is causing this non-deterministic behavior?

Thanks,
Raman

On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský  wrote:

> Hi Raman,
>
> can you share the details of the pipeline? How exactly are you using the
> looping timer? Timer as described in the linked blog post should be
> deterministic even when the order of the input elements is undefined.
> Does you logic depend on element ordering?
>
>   Jan
>
> On 1/12/21 3:18 PM, Raman Gupta wrote:
> > Hello, I am building and testing a pipeline with the direct runner.
> > The pipeline includes a looping timer -
> > https://beam.apache.org/blog/looping-timers/.
> >
> > For now, I am using JdbcIO to obtain my input data, though when put
> > into production the pipeline will use PubSubIO.
> >
> > I am finding that the looping timer begins producing outputs at a
> > random event time, which makes some sense given the randomization of
> > inputs in the direct runner. However, this makes the results of
> > executing my pipeline with the direct runner completely
> non-deterministic.
> >
> > So:
> >
> > 1) Is there a way to turn off this non-deterministic behavior, but
> > just for the GlobalWindow / LoopingTimer?
> >
> > 2) Perhaps alternatively, is there a way to "initialize" the looping
> > timer when the pipeline starts, rather than when it first sees an
> > element? Perhaps a side input?
> >
> > 3) Am I right in assuming that when I move this pipeline to pub/sub io
> > and operate it in streaming mode, this issue will go away?
> >
> > Thanks!
> > Raman
> >
>


Looping timer, global windows, and direct runner

2021-01-12 Thread Raman Gupta
Hello, I am building and testing a pipeline with the direct runner. The
pipeline includes a looping timer -
https://beam.apache.org/blog/looping-timers/.

For now, I am using JdbcIO to obtain my input data, though when put into
production the pipeline will use PubSubIO.

I am finding that the looping timer begins producing outputs at a random
event time, which makes some sense given the randomization of inputs in the
direct runner. However, this makes the results of executing my pipeline
with the direct runner completely non-deterministic.

So:

1) Is there a way to turn off this non-deterministic behavior, but just for
the GlobalWindow / LoopingTimer?

2) Perhaps alternatively, is there a way to "initialize" the looping timer
when the pipeline starts, rather than when it first sees an element?
Perhaps a side input?

3) Am I right in assuming that when I move this pipeline to pub/sub io and
operate it in streaming mode, this issue will go away?

Thanks!
Raman


Mocking business logic injected into DoFns

2020-12-28 Thread Raman Gupta
Hello, I have some DoFns which have properties containing business logic 
classes that are injected at pipeline construction time.

When testing these pipelines, I would like to mock this business logic, as it 
is quite complex, and easier to test independently of the pipeline.

However, my mocking framework does not generate serializable mocks, and my test 
therefore fails with a NotSerializable exception.

Is there a way to relax this requirement for unit tests?