On Wed, Nov 8, 2017 at 5:33 PM, Jacob Marble <jmar...@kochava.com> wrote:
> Good evening. I'm trying to nail down windowing. The concept is clear, just
> struggling with writing a working pipeline. Tonight the goal is group events
> by key and window, in a batch pipeline. All data is "late" because it's a
> batch pipeline, and I expect nothing to be dropped or processed in a "late"
> context.

Traditionally, in a batch pipeline we consider no data to be late, as
we have perfect knowledge of the watermark.

> Read section 7 and 8 of the Beam Programming Guide roughly twice.
> Sifted through the examples, WindowedWordCount is close, but it doesn't use
> triggering, which is where (2b) is probably off track.
>
> 1)
> PCollection is created through a series of transforms, including a
> Join.leftOuterJoin(). Apply a timestamp with something simple:
>
> collection.apply("add window timestamp",
>  ParDo.of(new DoFn<Foo, Foo>() {
>   @ProcessElement
>   public void map(ProcessContext context) {
>    Foo element = context.element();
>    Instant timestamp = new Instant(element.getActivityUnixSeconds() * 1000);
>    context.outputWithTimestamp(element, timestamp);
>   }
>  }));
>
> This fails with "java.lang.IllegalArgumentException: Cannot output with
> timestamp 2017-04-01T00:00:00.000Z. Output timestamps must be no earlier
> than the timestamp of the current input (294247-01-09T04:00:54.775Z) minus
> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
> Javadoc for details on changing the allowed skew."
>
> Is this expected? I don't care about skew, just want to set the timestamp
> per element.
>
> I worked around this by applying the timestamp earlier in the pipeline,
> right after a TextIO.read(). Why does that fix the problem?

I would suspect that very-far-in-the-future timestamp is the end of
the global window, set as the timestamp as the result of a
group-by-key.

You can set your timestamps earlier, as you have done, but in this
case they will get reset after passing through any GBK. It's possible
you could get what you want by setting TimestampCombiner to EARLIEST
(see 
https://github.com/apache/beam/blob/v2.1.1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L47)
but probably the right solution is to set the allowed timestamp skew
to infinity (or Long.MAX_VALUE or similar).

Generally this skew is needed in streaming to hold the watermark back
the right amount... Definitely not intuitive in your case; we should
think if there's something better we could do here.

> 2a)
> After applying the timestamp, let's window!
>
> collection.apply("window into sessions",
>  Window.<Foo>into(Sessions.withGapDuration(Duration.standardMinutes(10))))
>  .apply("key by something, reduce")
>  .apply(TextIO.write()...)
>
> Now I see an output file, what joy! But the output file is empty. Confirmed
> that the PCollection feeding TextIO.write() is seeing data. Maybe this is
> because the default trigger is incorrect for my use case? I expected not to
> need triggering in batch context, but the DefaultTrigger Javadoc makes me
> believe otherwise.
>
> 2b)
> How about the Never.ever() trigger? Javadoc: "Using this trigger will only
> produce output when the watermark passes the end of the {@link BoundedWindow
> window}". I don't know, but let's try. There's some error about allowed
> lateness and firing panes, so I'll try values that look standard:
>
> collection.apply("window into sessions",
>  Window.<Foo>into(Sessions.withGapDuration(Duration.standardMinutes(10)))
>
> .triggering(Never.ever()).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes())
>  .apply("key by something, reduce")
>  .apply(TextIO.write()...)
>
> This yields a new error:
> "java.lang.IllegalStateException: TimestampCombiner moved element from
> 294247-01-09T04:10:54.774Z to earlier time 294247-01-09T04:00:54.775Z (end
> of global window) for window
> org.apache.beam.sdk.transforms.windowing.GlobalWindow"
>
> So I'm probably looking in the wrong place.

I think if you resolve the issues above than this will take care of itself.

- Robert

Reply via email to