The original timestamps are probably being assigned in the watchForNewFiles transform, which is also setting the watermark:
https://github.com/apache/beam/blob/release-2.15.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L668 Until https://issues.apache.org/jira/browse/BEAM-644 is resolved, it probably makes sense to be able to customize the lag here. On Fri, Aug 16, 2019 at 6:44 PM Chengzhi Zhao <w.zhaocheng...@gmail.com> wrote: > > Hi Theodore, > > Thanks again for your insight and help. I'd like to learn more about how we > got the timestamp from WindowedValue initially from +dev@beam.apache.org > > -Chengzhi > > On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu <theo...@google.com> wrote: >> >> Hi Chengzhi, >> >> I'm not completely sure where/how the timestamp is set for a ProcessContext >> object. Here is the error code found within the Apache Beam repo. >> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java >> which makes reference to `elem.getTimestamp()` where elem is a WindowedValue. >> >> I am thinking +dev@beam.apache.org can offer some insight. Would be >> interested to find out more myself. >> >> -Theo >> >> On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao <w.zhaocheng...@gmail.com> >> wrote: >>> >>> Hi Theodore, >>> >>> Thanks for your reply. This is just a simple example that I tried to >>> understand how event time works in Beam. I could have more fields and I >>> would have an event time for each of record, so I tried to let Beam know >>> which filed is the event time to use for later windowing and computation. >>> >>> I think we you mentioned the probable reason sounds reasonable, I am still >>> trying to figure out in the error message "current input >>> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it. >>> >>> Thanks a lot for your help. >>> >>> -- Chengzhi >>> >>> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu <theo...@google.com> wrote: >>>> >>>> Hi Chengzhi, >>>> >>>> Are you simply trying to emit the timestamp onward? Why not just use >>>> `out.output` with an PCollection<Instant>? >>>> >>>> static class ReadWithEventTime extends DoFn<String, String> { >>>> @DoFn.ProcessElement >>>> public void processElement(@Element String line, >>>> OutputReceiver<Instant> out){ >>>> out.output(new Instant(Long.parseLong(line))); >>>> } >>>> } >>>> >>>> You can also output the line itself as a PCollection<String>. If you line >>>> has additional information to parse, consider a KeyValue Pair >>>> https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html >>>> where you can emit both some parsed context of the string and the >>>> timestamp. >>>> >>>> The probable reason why outputWithTimestamp doesn't work with older times >>>> is that the timestamp emitted is used specifically for windowing and for >>>> streaming type Data pipelines to determine which window each record >>>> belongs for aggregations. >>>> >>>> -Theo >>>> >>>> >>>> On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao <w.zhaocheng...@gmail.com> >>>> wrote: >>>>> >>>>> Hi folks, >>>>> >>>>> I am new to Beam and try to play with some example, I am running Beam >>>>> 2.14 with Direct runner to read some files (I continue generated). >>>>> >>>>> I am facing this error: Cannot output with timestamp >>>>> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the >>>>> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the >>>>> allowed skew (0 milliseconds). I searched online but still don't quite >>>>> understand it so I am asking here for some help. >>>>> >>>>> A file has some past timestamp in it: >>>>> 1565958615120 >>>>> 1565958615120 >>>>> 1565958615121 >>>>> >>>>> My code looks something like this: >>>>> >>>>> static class ReadWithEventTime extends DoFn<String, String> { >>>>> @ProcessElement >>>>> public void processElement(@Element String line, >>>>> OutputReceiver<String> out){ >>>>> out.outputWithTimestamp(line, new Instant(Long.parseLong(line))); >>>>> } >>>>> } >>>>> >>>>> public static void main(String[] args) { >>>>> PipelineOptions options = PipelineOptionsFactory.create(); >>>>> Pipeline pipeline = Pipeline.create(options); >>>>> >>>>> String sourcePath = new File("files/").getPath(); >>>>> >>>>> PCollection<String> data = pipeline.apply("ReadData", >>>>> TextIO.read().from(sourcePath + "/test*") >>>>> .watchForNewFiles(Duration.standardSeconds(5), >>>>> Watch.Growth.<String>never())); >>>>> >>>>> data.apply("ReadWithEventTime", ParDo.of(new >>>>> ReadWithEventTime())); >>>>> >>>>> pipeline.run().waitUntilFinish(); >>>>> >>>>> } >>>>> >>>>> >>>>> I am trying to understand in the error message where "current input >>>>> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark >>>>> when I start my application? If that's the case, is there a way that I >>>>> can change the initial watermark? >>>>> >>>>> Also, I can setup `withAllowedTimestampSkew` but it looks like it has >>>>> been deprecated. >>>>> >>>>> Any suggestion would be appreciated. Thank you! >>>>> >>>>> Best, >>>>> Chengzhi >>>>>