Thanks Robert, here's what I did with your advice.

After the early Join/GBK transformations:

collection
 .apply(WithTimestamps.of(...).withAllowedTimestampSkew(new
Duration(Long.MAX_VALUE/10)))
 
.apply(Window.<GeoPoiDaily>into(Sessions.withGapDuration(Duration.standardMinutes(10)))
   .withAllowedLateness(new
Duration(Long.MAX_VALUE/10)).discardingFiredPanes())
 .apply(WithKeys.of(...))
 .apply(Combine.perKey(...))
 .apply(TextIO.write()...);

Long.MAX_VALUE/10 to prevent an overflow error; Long.MAX_VALUE/2 also works.

Jacob

On Wed, Nov 8, 2017 at 5:54 PM, Robert Bradshaw <[email protected]> wrote:

> On Wed, Nov 8, 2017 at 5:33 PM, Jacob Marble <[email protected]> wrote:
> > Good evening. I'm trying to nail down windowing. The concept is clear,
> just
> > struggling with writing a working pipeline. Tonight the goal is group
> events
> > by key and window, in a batch pipeline. All data is "late" because it's a
> > batch pipeline, and I expect nothing to be dropped or processed in a
> "late"
> > context.
>
> Traditionally, in a batch pipeline we consider no data to be late, as
> we have perfect knowledge of the watermark.
>
> > Read section 7 and 8 of the Beam Programming Guide roughly twice.
> > Sifted through the examples, WindowedWordCount is close, but it doesn't
> use
> > triggering, which is where (2b) is probably off track.
> >
> > 1)
> > PCollection is created through a series of transforms, including a
> > Join.leftOuterJoin(). Apply a timestamp with something simple:
> >
> > collection.apply("add window timestamp",
> >  ParDo.of(new DoFn<Foo, Foo>() {
> >   @ProcessElement
> >   public void map(ProcessContext context) {
> >    Foo element = context.element();
> >    Instant timestamp = new Instant(element.getActivityUnixSeconds() *
> 1000);
> >    context.outputWithTimestamp(element, timestamp);
> >   }
> >  }));
> >
> > This fails with "java.lang.IllegalArgumentException: Cannot output with
> > timestamp 2017-04-01T00:00:00.000Z. Output timestamps must be no earlier
> > than the timestamp of the current input (294247-01-09T04:00:54.775Z)
> minus
> > the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
> > Javadoc for details on changing the allowed skew."
> >
> > Is this expected? I don't care about skew, just want to set the timestamp
> > per element.
> >
> > I worked around this by applying the timestamp earlier in the pipeline,
> > right after a TextIO.read(). Why does that fix the problem?
>
> I would suspect that very-far-in-the-future timestamp is the end of
> the global window, set as the timestamp as the result of a
> group-by-key.
>
> You can set your timestamps earlier, as you have done, but in this
> case they will get reset after passing through any GBK. It's possible
> you could get what you want by setting TimestampCombiner to EARLIEST
> (see https://github.com/apache/beam/blob/v2.1.1/sdks/java/
> core/src/main/java/org/apache/beam/sdk/transforms/windowing/
> TimestampCombiner.java#L47)
> but probably the right solution is to set the allowed timestamp skew
> to infinity (or Long.MAX_VALUE or similar).
>
> Generally this skew is needed in streaming to hold the watermark back
> the right amount... Definitely not intuitive in your case; we should
> think if there's something better we could do here.
>
> > 2a)
> > After applying the timestamp, let's window!
> >
> > collection.apply("window into sessions",
> >  Window.<Foo>into(Sessions.withGapDuration(Duration.
> standardMinutes(10))))
> >  .apply("key by something, reduce")
> >  .apply(TextIO.write()...)
> >
> > Now I see an output file, what joy! But the output file is empty.
> Confirmed
> > that the PCollection feeding TextIO.write() is seeing data. Maybe this is
> > because the default trigger is incorrect for my use case? I expected not
> to
> > need triggering in batch context, but the DefaultTrigger Javadoc makes me
> > believe otherwise.
> >
> > 2b)
> > How about the Never.ever() trigger? Javadoc: "Using this trigger will
> only
> > produce output when the watermark passes the end of the {@link
> BoundedWindow
> > window}". I don't know, but let's try. There's some error about allowed
> > lateness and firing panes, so I'll try values that look standard:
> >
> > collection.apply("window into sessions",
> >  Window.<Foo>into(Sessions.withGapDuration(Duration.
> standardMinutes(10)))
> >
> > .triggering(Never.ever()).withAllowedLateness(Duration.standardDays(1)).
> discardingFiredPanes())
> >  .apply("key by something, reduce")
> >  .apply(TextIO.write()...)
> >
> > This yields a new error:
> > "java.lang.IllegalStateException: TimestampCombiner moved element from
> > 294247-01-09T04:10:54.774Z to earlier time 294247-01-09T04:00:54.775Z
> (end
> > of global window) for window
> > org.apache.beam.sdk.transforms.windowing.GlobalWindow"
> >
> > So I'm probably looking in the wrong place.
>
> I think if you resolve the issues above than this will take care of itself.
>
> - Robert
>

Reply via email to