I want to just highlight some aspects of the docs for this thread: - getAllowedTimestampSkew() does not adjust the watermark, so it can be dangerous and you have to carefully set up allowed lateness too - the replacement for getAllowedTimestampSkew does not exist yet, but is described in https://issues.apache.org/jira/browse/BEAM-644
I was browsing the code & docs of TextIO / FileIO / MatchConfiguration and I did not see any way to influence the input event timestamps or watermark. Does it exist? Kenn On Wed, Mar 27, 2019 at 12:06 PM Robin Qiu <[email protected]> wrote: > Hi Nikhil, > > If I understand it correctly your DoFn (KeyVal) class may emit records > whose timestamp is before the input's, in this case I think you could > override the default behavior of getAllowedTimestampSkew() in DoFn to allow > infinite skew. > > To be specific, you can add the following function to your KeyVal class: > @Override public Duration getAllowedTimestampSkew() {return > Duration.millis(Long.MAX_VALUE);} > > Hope this helps! > Robin > > On Wed, Mar 27, 2019 at 11:44 AM Nikhil Goyal <[email protected]> wrote: > >> Hi guys, >> I am seeing this error while running my beam job on Dataflow. >> >> java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: >> java.lang.IllegalArgumentException: Cannot output with timestamp >> 2019-03-27T17:46:41.893Z. Output timestamps must be no earlier than the >> timestamp of the current input (2019-03-27T17:47:00.526Z) minus the allowed >> skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for >> details on changing the allowed skew. >> >> This is the code. It's quite simple. >> >> static class KeyVal extends DoFn <String, KV<String, Long>> { >> @ProcessElement >> public void processElement(@Element String r, OutputReceiver<KV<String, >> Long>> out) { >> String[] keyval = r.split(" "); >> out.outputWithTimestamp(KV.of(keyval[0], Long.parseLong(keyval[1])), >> Instant.ofEpochMilli(Long.pareLong(keyval[2]))) ;} >> } >> >> TextIO.Read source = >> TextIO.read().from(INPUT_PATH).watchForNewFiles(Duration.standardMinutes(1), >> Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(2))); >> >> >> pipeline >> .apply(source) >> .apply(ParDo.of(new KeyVal())) >> .apply( >> Window.<KV<String,Long>>into( >> FixedWindows.of(Duration.standardMinutes(10)) >> ).triggering( >> >> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(2))) >> >> ).withAllowedLateness(Duration.standardMinutes(20)).accumulatingFiredPanes() >> )... >> >> >> I got it working using >> >> .apply(WithTimestamps.of((String line) -> >> Instant.ofEpochMilli(Long.parseLong(line.split(" >> ")[2]))).withAllowedTimestampSkew(Duration.millis(Long.MAX_VALUE / 2L))) >> >> Documentation says withAllowedTimestampSkew is deprecated and using >> withAllowedLateness should take care of it. Is there a better way to add >> timestamp to record? Thanks >> >> Nikhil >> >> >>
