Hi Chengzhi, I'm not completely sure where/how the timestamp is set for a ProcessContext object. Here is the error code found within the Apache Beam repo. https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java which makes reference to `elem.getTimestamp()` where elem is a WindowedValue.
I am thinking +dev@beam.apache.org <dev@beam.apache.org> can offer some insight. Would be interested to find out more myself. -Theo On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao <w.zhaocheng...@gmail.com> wrote: > Hi Theodore, > > Thanks for your reply. This is just a simple example that I tried to > understand how event time works in Beam. I could have more fields and I > would have an event time for each of record, so I tried to let Beam know > which filed is the event time to use for later windowing and computation. > > I think we you mentioned the probable reason sounds reasonable, I am still > trying to figure out in the error message "current input > (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it. > > Thanks a lot for your help. > > -- Chengzhi > > On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu <theo...@google.com> wrote: > >> Hi Chengzhi, >> >> Are you simply trying to emit the timestamp onward? Why not just use >> `out.output` with an PCollection<Instant>? >> >> static class ReadWithEventTime extends DoFn<String, String> { >> @DoFn.ProcessElement >> public void processElement(@Element String line, OutputReceiver<Instant> >> out){ >> out.output(new Instant(Long.parseLong(line))); >> } >> } >> >> You can also output the line itself as a PCollection<String>. If you line >> has additional information to parse, consider a KeyValue Pair >> https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html >> where >> you can emit both some parsed context of the string and the timestamp. >> >> The probable reason why outputWithTimestamp doesn't work with older times >> is that the timestamp emitted is used specifically for windowing and for >> streaming type Data pipelines to determine which window each record belongs >> for aggregations. >> >> -Theo >> >> >> On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao <w.zhaocheng...@gmail.com> >> wrote: >> >>> Hi folks, >>> >>> I am new to Beam and try to play with some example, I am running Beam >>> 2.14 with Direct runner to read some files (I continue generated). >>> >>> I am facing this error: Cannot output with timestamp >>> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the >>> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed >>> skew (0 milliseconds). I searched online but still don't quite understand >>> it so I am asking here for some help. >>> >>> A file has some past timestamp in it: >>> 1565958615120 >>> 1565958615120 >>> 1565958615121 >>> >>> My code looks something like this: >>> >>> static class ReadWithEventTime extends DoFn<String, String> { >>> @ProcessElement >>> public void processElement(@Element String line, OutputReceiver<String> >>> out){ >>> out.outputWithTimestamp(line, new Instant(Long.parseLong(line))); >>> } >>> } >>> >>> public static void main(String[] args) { >>> PipelineOptions options = PipelineOptionsFactory.create(); >>> Pipeline pipeline = Pipeline.create(options); >>> >>> String sourcePath = new File("files/").getPath(); >>> >>> PCollection<String> data = pipeline.apply("ReadData", >>> TextIO.read().from(sourcePath + "/test*") >>> .watchForNewFiles(Duration.standardSeconds(5), >>> Watch.Growth.<String>never())); >>> >>> data.apply("ReadWithEventTime", ParDo.of(new ReadWithEventTime())); >>> >>> pipeline.run().waitUntilFinish(); >>> >>> } >>> >>> >>> I am trying to understand in the error message where "current input >>> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark >>> when I start my application? If that's the case, is there a way that I can >>> change the initial watermark? >>> >>> Also, I can setup `withAllowedTimestampSkew` but it looks like it has >>> been deprecated. >>> >>> Any suggestion would be appreciated. Thank you! >>> >>> Best, >>> Chengzhi >>> >>> >>