Thanks Robert, here's what I did with your advice. After the early Join/GBK transformations:
collection .apply(WithTimestamps.of(...).withAllowedTimestampSkew(new Duration(Long.MAX_VALUE/10))) .apply(Window.<GeoPoiDaily>into(Sessions.withGapDuration(Duration.standardMinutes(10))) .withAllowedLateness(new Duration(Long.MAX_VALUE/10)).discardingFiredPanes()) .apply(WithKeys.of(...)) .apply(Combine.perKey(...)) .apply(TextIO.write()...); Long.MAX_VALUE/10 to prevent an overflow error; Long.MAX_VALUE/2 also works. Jacob On Wed, Nov 8, 2017 at 5:54 PM, Robert Bradshaw <[email protected]> wrote: > On Wed, Nov 8, 2017 at 5:33 PM, Jacob Marble <[email protected]> wrote: > > Good evening. I'm trying to nail down windowing. The concept is clear, > just > > struggling with writing a working pipeline. Tonight the goal is group > events > > by key and window, in a batch pipeline. All data is "late" because it's a > > batch pipeline, and I expect nothing to be dropped or processed in a > "late" > > context. > > Traditionally, in a batch pipeline we consider no data to be late, as > we have perfect knowledge of the watermark. > > > Read section 7 and 8 of the Beam Programming Guide roughly twice. > > Sifted through the examples, WindowedWordCount is close, but it doesn't > use > > triggering, which is where (2b) is probably off track. > > > > 1) > > PCollection is created through a series of transforms, including a > > Join.leftOuterJoin(). Apply a timestamp with something simple: > > > > collection.apply("add window timestamp", > > ParDo.of(new DoFn<Foo, Foo>() { > > @ProcessElement > > public void map(ProcessContext context) { > > Foo element = context.element(); > > Instant timestamp = new Instant(element.getActivityUnixSeconds() * > 1000); > > context.outputWithTimestamp(element, timestamp); > > } > > })); > > > > This fails with "java.lang.IllegalArgumentException: Cannot output with > > timestamp 2017-04-01T00:00:00.000Z. Output timestamps must be no earlier > > than the timestamp of the current input (294247-01-09T04:00:54.775Z) > minus > > the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() > > Javadoc for details on changing the allowed skew." > > > > Is this expected? I don't care about skew, just want to set the timestamp > > per element. > > > > I worked around this by applying the timestamp earlier in the pipeline, > > right after a TextIO.read(). Why does that fix the problem? > > I would suspect that very-far-in-the-future timestamp is the end of > the global window, set as the timestamp as the result of a > group-by-key. > > You can set your timestamps earlier, as you have done, but in this > case they will get reset after passing through any GBK. It's possible > you could get what you want by setting TimestampCombiner to EARLIEST > (see https://github.com/apache/beam/blob/v2.1.1/sdks/java/ > core/src/main/java/org/apache/beam/sdk/transforms/windowing/ > TimestampCombiner.java#L47) > but probably the right solution is to set the allowed timestamp skew > to infinity (or Long.MAX_VALUE or similar). > > Generally this skew is needed in streaming to hold the watermark back > the right amount... Definitely not intuitive in your case; we should > think if there's something better we could do here. > > > 2a) > > After applying the timestamp, let's window! > > > > collection.apply("window into sessions", > > Window.<Foo>into(Sessions.withGapDuration(Duration. > standardMinutes(10)))) > > .apply("key by something, reduce") > > .apply(TextIO.write()...) > > > > Now I see an output file, what joy! But the output file is empty. > Confirmed > > that the PCollection feeding TextIO.write() is seeing data. Maybe this is > > because the default trigger is incorrect for my use case? I expected not > to > > need triggering in batch context, but the DefaultTrigger Javadoc makes me > > believe otherwise. > > > > 2b) > > How about the Never.ever() trigger? Javadoc: "Using this trigger will > only > > produce output when the watermark passes the end of the {@link > BoundedWindow > > window}". I don't know, but let's try. There's some error about allowed > > lateness and firing panes, so I'll try values that look standard: > > > > collection.apply("window into sessions", > > Window.<Foo>into(Sessions.withGapDuration(Duration. > standardMinutes(10))) > > > > .triggering(Never.ever()).withAllowedLateness(Duration.standardDays(1)). > discardingFiredPanes()) > > .apply("key by something, reduce") > > .apply(TextIO.write()...) > > > > This yields a new error: > > "java.lang.IllegalStateException: TimestampCombiner moved element from > > 294247-01-09T04:10:54.774Z to earlier time 294247-01-09T04:00:54.775Z > (end > > of global window) for window > > org.apache.beam.sdk.transforms.windowing.GlobalWindow" > > > > So I'm probably looking in the wrong place. > > I think if you resolve the issues above than this will take care of itself. > > - Robert >
