Re: Try to understand "Output timestamps must be no earlier than the timestamp of the current input"
Hi Robert, Thanks for your information, that explains the behavior I noticed. I guess my current solution would be somehow to shift the watermark or start the streaming process before any files come in to settle down the initial watermark. I will keep watching the JIRA you shared, thanks for the insights. -Chengzhi On Tue, Aug 20, 2019 at 4:53 PM Robert Bradshaw wrote: > The original timestamps are probably being assigned in the > watchForNewFiles transform, which is also setting the watermark: > > > https://github.com/apache/beam/blob/release-2.15.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L668 > > Until https://issues.apache.org/jira/browse/BEAM-644 is resolved, it > probably makes sense to be able to customize the lag here. > > On Fri, Aug 16, 2019 at 6:44 PM Chengzhi Zhao > wrote: > > > > Hi Theodore, > > > > Thanks again for your insight and help. I'd like to learn more about how > we got the timestamp from WindowedValue initially from + > dev@beam.apache.org > > > > -Chengzhi > > > > On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu wrote: > >> > >> Hi Chengzhi, > >> > >> I'm not completely sure where/how the timestamp is set for a > ProcessContext object. Here is the error code found within the Apache Beam > repo. > >> > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java > >> which makes reference to `elem.getTimestamp()` where elem is a > WindowedValue. > >> > >> I am thinking +dev@beam.apache.org can offer some insight. Would be > interested to find out more myself. > >> > >> -Theo > >> > >> On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao > wrote: > >>> > >>> Hi Theodore, > >>> > >>> Thanks for your reply. This is just a simple example that I tried to > understand how event time works in Beam. I could have more fields and I > would have an event time for each of record, so I tried to let Beam know > which filed is the event time to use for later windowing and computation. > >>> > >>> I think we you mentioned the probable reason sounds reasonable, I am > still trying to figure out in the error message "current input > (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it. > >>> > >>> Thanks a lot for your help. > >>> > >>> -- Chengzhi > >>> > >>> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu > wrote: > > Hi Chengzhi, > > Are you simply trying to emit the timestamp onward? Why not just use > `out.output` with an PCollection? > > static class ReadWithEventTime extends DoFn { > @DoFn.ProcessElement > public void processElement(@Element String line, > OutputReceiver out){ > out.output(new Instant(Long.parseLong(line))); > } > } > > You can also output the line itself as a PCollection. If you > line has additional information to parse, consider a KeyValue Pair > https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html > where you can emit both some parsed context of the string and the timestamp. > > The probable reason why outputWithTimestamp doesn't work with older > times is that the timestamp emitted is used specifically for windowing and > for streaming type Data pipelines to determine which window each record > belongs for aggregations. > > -Theo > > > On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao < > w.zhaocheng...@gmail.com> wrote: > > > > Hi folks, > > > > I am new to Beam and try to play with some example, I am running > Beam 2.14 with Direct runner to read some files (I continue generated). > > > > I am facing this error: Cannot output with timestamp > 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the > timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed > skew (0 milliseconds). I searched online but still don't quite understand > it so I am asking here for some help. > > > > A file has some past timestamp in it: > > 1565958615120 > > 1565958615120 > > 1565958615121 > > > > My code looks something like this: > > > >static class ReadWithEventTime extends DoFn { > > @ProcessElement > > public void processElement(@Element String line, > OutputReceiver out){ > > out.outputWithTimestamp(line, new > Instant(Long.parseLong(line))); > > } > > } > > > > public static void main(String[] args) { > > PipelineOptions options = PipelineOptionsFactory.create(); > > Pipeline pipeline = Pipeline.create(options); > > > > String sourcePath = new File("files/").getPath(); > > > > PCollection data = pipeline.apply("ReadData", > > TextIO.read().from(sourcePath + "/test*") > > > .watchForNewFiles(Duration.standardSeconds(5), > Watch.Growth.never())); > > > > data.apply("ReadWithEventTime", Pa
Re: Try to understand "Output timestamps must be no earlier than the timestamp of the current input"
The original timestamps are probably being assigned in the watchForNewFiles transform, which is also setting the watermark: https://github.com/apache/beam/blob/release-2.15.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L668 Until https://issues.apache.org/jira/browse/BEAM-644 is resolved, it probably makes sense to be able to customize the lag here. On Fri, Aug 16, 2019 at 6:44 PM Chengzhi Zhao wrote: > > Hi Theodore, > > Thanks again for your insight and help. I'd like to learn more about how we > got the timestamp from WindowedValue initially from +dev@beam.apache.org > > -Chengzhi > > On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu wrote: >> >> Hi Chengzhi, >> >> I'm not completely sure where/how the timestamp is set for a ProcessContext >> object. Here is the error code found within the Apache Beam repo. >> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java >> which makes reference to `elem.getTimestamp()` where elem is a WindowedValue. >> >> I am thinking +dev@beam.apache.org can offer some insight. Would be >> interested to find out more myself. >> >> -Theo >> >> On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao >> wrote: >>> >>> Hi Theodore, >>> >>> Thanks for your reply. This is just a simple example that I tried to >>> understand how event time works in Beam. I could have more fields and I >>> would have an event time for each of record, so I tried to let Beam know >>> which filed is the event time to use for later windowing and computation. >>> >>> I think we you mentioned the probable reason sounds reasonable, I am still >>> trying to figure out in the error message "current input >>> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it. >>> >>> Thanks a lot for your help. >>> >>> -- Chengzhi >>> >>> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu wrote: Hi Chengzhi, Are you simply trying to emit the timestamp onward? Why not just use `out.output` with an PCollection? static class ReadWithEventTime extends DoFn { @DoFn.ProcessElement public void processElement(@Element String line, OutputReceiver out){ out.output(new Instant(Long.parseLong(line))); } } You can also output the line itself as a PCollection. If you line has additional information to parse, consider a KeyValue Pair https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html where you can emit both some parsed context of the string and the timestamp. The probable reason why outputWithTimestamp doesn't work with older times is that the timestamp emitted is used specifically for windowing and for streaming type Data pipelines to determine which window each record belongs for aggregations. -Theo On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao wrote: > > Hi folks, > > I am new to Beam and try to play with some example, I am running Beam > 2.14 with Direct runner to read some files (I continue generated). > > I am facing this error: Cannot output with timestamp > 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the > timestamp of the current input (2019-08-16T12:39:06.887Z) minus the > allowed skew (0 milliseconds). I searched online but still don't quite > understand it so I am asking here for some help. > > A file has some past timestamp in it: > 1565958615120 > 1565958615120 > 1565958615121 > > My code looks something like this: > >static class ReadWithEventTime extends DoFn { > @ProcessElement > public void processElement(@Element String line, > OutputReceiver out){ > out.outputWithTimestamp(line, new Instant(Long.parseLong(line))); > } > } > > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline pipeline = Pipeline.create(options); > > String sourcePath = new File("files/").getPath(); > > PCollection data = pipeline.apply("ReadData", > TextIO.read().from(sourcePath + "/test*") > .watchForNewFiles(Duration.standardSeconds(5), > Watch.Growth.never())); > > data.apply("ReadWithEventTime", ParDo.of(new > ReadWithEventTime())); > > pipeline.run().waitUntilFinish(); > > } > > > I am trying to understand in the error message where "current input > (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark > when I start my application? If that's the case, is there a way that I > can change the initial watermark? > > Also, I can setup `withAllowedTimestampSkew` but it looks like it has >
Re: Try to understand "Output timestamps must be no earlier than the timestamp of the current input"
Hi Theodore, Thanks again for your insight and help. I'd like to learn more about how we got the timestamp from WindowedValue initially from +dev@beam.apache.org -Chengzhi On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu wrote: > Hi Chengzhi, > > I'm not completely sure where/how the timestamp is set for a > ProcessContext object. Here is the error code found within the Apache Beam > repo. > > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java > which makes reference to `elem.getTimestamp()` where elem is a > WindowedValue. > > I am thinking +dev@beam.apache.org can offer some > insight. Would be interested to find out more myself. > > -Theo > > On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao > wrote: > >> Hi Theodore, >> >> Thanks for your reply. This is just a simple example that I tried to >> understand how event time works in Beam. I could have more fields and I >> would have an event time for each of record, so I tried to let Beam know >> which filed is the event time to use for later windowing and computation. >> >> I think we you mentioned the probable reason sounds reasonable, I am >> still trying to figure out in the error message "current input >> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it. >> >> Thanks a lot for your help. >> >> -- Chengzhi >> >> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu wrote: >> >>> Hi Chengzhi, >>> >>> Are you simply trying to emit the timestamp onward? Why not just use >>> `out.output` with an PCollection? >>> >>> static class ReadWithEventTime extends DoFn { >>> @DoFn.ProcessElement >>> public void processElement(@Element String line, >>> OutputReceiver out){ >>> out.output(new Instant(Long.parseLong(line))); >>> } >>> } >>> >>> You can also output the line itself as a PCollection. If you >>> line has additional information to parse, consider a KeyValue Pair >>> https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html >>> where >>> you can emit both some parsed context of the string and the timestamp. >>> >>> The probable reason why outputWithTimestamp doesn't work with older >>> times is that the timestamp emitted is used specifically for windowing and >>> for streaming type Data pipelines to determine which window each record >>> belongs for aggregations. >>> >>> -Theo >>> >>> >>> On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao >>> wrote: >>> Hi folks, I am new to Beam and try to play with some example, I am running Beam 2.14 with Direct runner to read some files (I continue generated). I am facing this error: Cannot output with timestamp 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed skew (0 milliseconds). I searched online but still don't quite understand it so I am asking here for some help. A file has some past timestamp in it: 1565958615120 1565958615120 1565958615121 My code looks something like this: static class ReadWithEventTime extends DoFn { @ProcessElement public void processElement(@Element String line, OutputReceiver out){ out.outputWithTimestamp(line, new Instant(Long.parseLong(line))); } } public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); String sourcePath = new File("files/").getPath(); PCollection data = pipeline.apply("ReadData", TextIO.read().from(sourcePath + "/test*") .watchForNewFiles(Duration.standardSeconds(5), Watch.Growth.never())); data.apply("ReadWithEventTime", ParDo.of(new ReadWithEventTime())); pipeline.run().waitUntilFinish(); } I am trying to understand in the error message where "current input (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark when I start my application? If that's the case, is there a way that I can change the initial watermark? Also, I can setup `withAllowedTimestampSkew` but it looks like it has been deprecated. Any suggestion would be appreciated. Thank you! Best, Chengzhi >>>
Re: Try to understand "Output timestamps must be no earlier than the timestamp of the current input"
Hi Chengzhi, I'm not completely sure where/how the timestamp is set for a ProcessContext object. Here is the error code found within the Apache Beam repo. https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java which makes reference to `elem.getTimestamp()` where elem is a WindowedValue. I am thinking +dev@beam.apache.org can offer some insight. Would be interested to find out more myself. -Theo On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao wrote: > Hi Theodore, > > Thanks for your reply. This is just a simple example that I tried to > understand how event time works in Beam. I could have more fields and I > would have an event time for each of record, so I tried to let Beam know > which filed is the event time to use for later windowing and computation. > > I think we you mentioned the probable reason sounds reasonable, I am still > trying to figure out in the error message "current input > (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it. > > Thanks a lot for your help. > > -- Chengzhi > > On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu wrote: > >> Hi Chengzhi, >> >> Are you simply trying to emit the timestamp onward? Why not just use >> `out.output` with an PCollection? >> >> static class ReadWithEventTime extends DoFn { >> @DoFn.ProcessElement >> public void processElement(@Element String line, OutputReceiver >> out){ >> out.output(new Instant(Long.parseLong(line))); >> } >> } >> >> You can also output the line itself as a PCollection. If you line >> has additional information to parse, consider a KeyValue Pair >> https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html >> where >> you can emit both some parsed context of the string and the timestamp. >> >> The probable reason why outputWithTimestamp doesn't work with older times >> is that the timestamp emitted is used specifically for windowing and for >> streaming type Data pipelines to determine which window each record belongs >> for aggregations. >> >> -Theo >> >> >> On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao >> wrote: >> >>> Hi folks, >>> >>> I am new to Beam and try to play with some example, I am running Beam >>> 2.14 with Direct runner to read some files (I continue generated). >>> >>> I am facing this error: Cannot output with timestamp >>> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the >>> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed >>> skew (0 milliseconds). I searched online but still don't quite understand >>> it so I am asking here for some help. >>> >>> A file has some past timestamp in it: >>> 1565958615120 >>> 1565958615120 >>> 1565958615121 >>> >>> My code looks something like this: >>> >>>static class ReadWithEventTime extends DoFn { >>> @ProcessElement >>> public void processElement(@Element String line, OutputReceiver >>> out){ >>> out.outputWithTimestamp(line, new Instant(Long.parseLong(line))); >>> } >>> } >>> >>> public static void main(String[] args) { >>> PipelineOptions options = PipelineOptionsFactory.create(); >>> Pipeline pipeline = Pipeline.create(options); >>> >>> String sourcePath = new File("files/").getPath(); >>> >>> PCollection data = pipeline.apply("ReadData", >>> TextIO.read().from(sourcePath + "/test*") >>> .watchForNewFiles(Duration.standardSeconds(5), >>> Watch.Growth.never())); >>> >>> data.apply("ReadWithEventTime", ParDo.of(new ReadWithEventTime())); >>> >>> pipeline.run().waitUntilFinish(); >>> >>> } >>> >>> >>> I am trying to understand in the error message where "current input >>> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark >>> when I start my application? If that's the case, is there a way that I can >>> change the initial watermark? >>> >>> Also, I can setup `withAllowedTimestampSkew` but it looks like it has >>> been deprecated. >>> >>> Any suggestion would be appreciated. Thank you! >>> >>> Best, >>> Chengzhi >>> >>> >>