Good evening. I'm trying to nail down windowing. The concept is clear, just
struggling with writing a working pipeline. Tonight the goal is group
events by key and window, in a batch pipeline. All data is "late" because
it's a batch pipeline, and I expect nothing to be dropped or processed in a
"late" context.

Read section 7 and 8 of the Beam Programming Guide roughly twice.
Sifted through the examples, WindowedWordCount is close, but it doesn't use
triggering, which is where (2b) is probably off track.

1)
PCollection is created through a series of transforms, including a
Join.leftOuterJoin(). Apply a timestamp with something simple:

collection.apply("add window timestamp",
 ParDo.of(new DoFn<Foo, Foo>() {
  @ProcessElement
  public void map(ProcessContext context) {
   Foo element = context.element();
   Instant timestamp = new Instant(element.getActivityUnixSeconds() * 1000);
   context.outputWithTimestamp(element, timestamp);
  }
 }));

This fails with "java.lang.IllegalArgumentException: Cannot output with
timestamp 2017-04-01T00:00:00.000Z. Output timestamps must be no earlier
than the timestamp of the current input (294247-01-09T04:00:54.775Z) minus
the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
Javadoc for details on changing the allowed skew."

*Is this expected? I don't care about skew, just want to set the timestamp
per element.*

I worked around this by applying the timestamp earlier in the pipeline,
right after a TextIO.read(). Why does that fix the problem?

2a)
After applying the timestamp, let's window!

collection.apply("window into sessions",
 Window.<Foo>into(Sessions.withGapDuration(Duration.standardMinutes(10))))
 .apply("key by something, reduce")
 .apply(TextIO.write()...)

Now I see an output file, what joy! *But the output file is empty.* Confirmed
that the PCollection feeding TextIO.write() is seeing data. Maybe this is
because the default trigger is incorrect for my use case? I expected not to
need triggering in batch context, but the DefaultTrigger Javadoc makes me
believe otherwise.

2b)
How about the Never.ever() trigger? Javadoc: "Using this trigger will only
produce output when the watermark passes the end of the {@link
BoundedWindow window}". I don't know, but let's try. There's some error
about allowed lateness and firing panes, so I'll try values that look
standard:

collection.apply("window into sessions",
 Window.<Foo>into(Sessions.withGapDuration(Duration.standardMinutes(10)))
  .triggering(Never.ever()).withAllowedLateness(Duration.stand
ardDays(1)).discardingFiredPanes())
 .apply("key by something, reduce")
 .apply(TextIO.write()...)

This yields a new error:
"java.lang.IllegalStateException: TimestampCombiner moved element from
294247-01-09T04:10:54.774Z to earlier time 294247-01-09T04:00:54.775Z (end
of global window) for window org.apache.beam.sdk.transforms
.windowing.GlobalWindow"

So I'm probably looking in the wrong place.

Thanks!

Jacob

Reply via email to