I found in the following method that the time is deliberetly calculated within
the past 2 hours.On the other hand, I get the following exception complaining
why its in the past!!!I appreciate any clarification...
public class WindowedWordCount {
static class AddTimestampFn extends DoFn<KV<byte[], String>, String> {
private static final Duration RAND_RANGE = Duration.standardHours(2);
private final Instant minTimestamp;
AddTimestampFn() { this.minTimestamp = new
Instant(System.currentTimeMillis()); }
@Override public void processElement(ProcessContext c) { //
Generate a timestamp that falls somewhere in the past two hours. long
randMillis = (long) (Math.random() * RAND_RANGE.getMillis()); Instant
randomTimestamp = minTimestamp.plus(randMillis); /** * Concept #2:
Set the data element with that timestamp. */
c.outputWithTimestamp(c.element().toString() , new Instant(randomTimestamp));
} }
From: amir bahmanyari <[email protected]>
To: "[email protected]" <[email protected]>
Sent: Wednesday, August 3, 2016 7:25 PM
Subject: changing the allowed skew
Hi Colleagues,I am basically running the code in example WindowedWordCount.The
only difference is that I dont TextIO but get records via KakkaIO.Everything
else the same. I get the following exception.Appreciate your suggestions to fix
it..Cheers
Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp
2016-08-04T02:23:21.137Z. Output timestamps must be no earlier than the
timestamp of the current input (2016-08-04T02:23:22.896Z) minus the allowed
skew (0 milliseconds). See the DoFn#getAllowedTimestmapSkew() Javadoc for
details on changing the allowed skew. at
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkAbstractParDoWrapper.checkTimestamp(FlinkAbstractParDoWrapper.java:201)