Hi Amir, I believe you should use KafkaIO#withTimestampFn [1]. For unbounded PCollections, the source itself needs to know about the timestamps so it can maintain a good watermark.
The example you are editing uses a bounded input, which has different implications for the watermark. The text in the comment seems to have strayed from the example code [2]. Hope this helps, Kenn [1] https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136 [2] https://github.com/apache/incubator-beam/commit/691a5828b45423940e850bbc64ccc5daf7599c76#diff-a1ce476065c2eb973a02d794d29758e0 On Thu, Aug 4, 2016 at 11:38 AM, amir bahmanyari <[email protected]> wrote: > I found in the following method that the time is deliberetly calculated > within the past 2 hours. > On the other hand, I get the following exception complaining why its in > the past!!! > I appreciate any clarification... > > public class WindowedWordCount { > > static class *AddTimestampFn* extends DoFn<KV<byte[], String>, String> { > private static final Duration RAND_RANGE = Duration.standardHours(2); > private final Instant minTimestamp; > > AddTimestampFn() { > this.minTimestamp = new Instant(System.currentTimeMillis()); > } > > @Override > public void processElement(ProcessContext c) { > * // Generate a timestamp that falls somewhere in the past two > hours.* > long randMillis = (long) (Math.random() * RAND_RANGE.getMillis()); > Instant randomTimestamp = minTimestamp.plus(randMillis); > /** > * Concept #2: Set the data element with that timestamp. > */ > c.outputWithTimestamp(c.element().toString() , new > Instant(randomTimestamp)); > } > } > > > ------------------------------ > *From:* amir bahmanyari <[email protected]> > *To:* "[email protected]" <[email protected]> > *Sent:* Wednesday, August 3, 2016 7:25 PM > *Subject:* changing the allowed skew > > Hi Colleagues, > I am basically running the code in example WindowedWordCount. > The only difference is that I dont TextIO but get records via KakkaIO. > Everything else the same. I get the following exception. > Appreciate your suggestions to fix it.. > Cheers > > > Caused by: java.lang.IllegalArgumentException: Cannot output with > timestamp 2016-08-04T02:23:21.137Z. Output timestamps must be no earlier > than the timestamp of the current input (2016-08-04T02:23:22.896Z) minus > the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestmapSkew() > Javadoc for details on changing the allowed skew. > at org.apache.beam.runners.flink.translation.wrappers.streaming. > FlinkAbstractParDoWrapper.checkTimestamp(FlinkAbstractParDoWrapper. > java:201) > > > > >
