Re: Try to understand "Output timestamps must be no earlier than the timestamp of the current input"

2019-08-20 Thread Chengzhi Zhao
Hi Robert,

Thanks for your information, that explains the behavior I noticed. I guess
my current solution would be somehow to shift the watermark or start the
streaming process before any files come in to settle down the initial
watermark.

I will keep watching the JIRA you shared, thanks for the insights.

-Chengzhi

On Tue, Aug 20, 2019 at 4:53 PM Robert Bradshaw  wrote:

> The original timestamps are probably being assigned in the
> watchForNewFiles transform, which is also setting the watermark:
>
>
> https://github.com/apache/beam/blob/release-2.15.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L668
>
> Until https://issues.apache.org/jira/browse/BEAM-644 is resolved, it
> probably makes sense to be able to customize the lag here.
>
> On Fri, Aug 16, 2019 at 6:44 PM Chengzhi Zhao 
> wrote:
> >
> > Hi Theodore,
> >
> > Thanks again for your insight and help. I'd like to learn more about how
> we got the timestamp from WindowedValue initially from +
> dev@beam.apache.org
> >
> > -Chengzhi
> >
> > On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu  wrote:
> >>
> >> Hi Chengzhi,
> >>
> >> I'm not completely sure where/how the timestamp is set for a
> ProcessContext object. Here is the error code found within the Apache Beam
> repo.
> >>
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
> >> which makes reference to `elem.getTimestamp()` where elem is a
> WindowedValue.
> >>
> >> I am thinking +dev@beam.apache.org can offer some insight. Would be
> interested to find out more myself.
> >>
> >> -Theo
> >>
> >> On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao 
> wrote:
> >>>
> >>> Hi Theodore,
> >>>
> >>> Thanks for your reply. This is just a simple example that I tried to
> understand how event time works in Beam. I could have more fields and I
> would have an event time for each of record, so I tried to let Beam know
> which filed is the event time to use for later windowing and computation.
> >>>
> >>> I think we you mentioned the probable reason sounds reasonable, I am
> still trying to figure out in the error message "current input
> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it.
> >>>
> >>> Thanks a lot for your help.
> >>>
> >>> -- Chengzhi
> >>>
> >>> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu 
> wrote:
> 
>  Hi Chengzhi,
> 
>  Are you simply trying to emit the timestamp onward? Why not just use
> `out.output` with an PCollection?
> 
>  static class ReadWithEventTime extends DoFn {
>  @DoFn.ProcessElement
>  public void processElement(@Element String line,
> OutputReceiver out){
>  out.output(new Instant(Long.parseLong(line)));
>  }
>  }
> 
>  You can also output the line itself as a PCollection. If you
> line has additional information to parse, consider a KeyValue Pair
> https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html
> where you can emit both some parsed context of the string and the timestamp.
> 
>  The probable reason why outputWithTimestamp doesn't work with older
> times is that the timestamp emitted is used specifically for windowing and
> for streaming type Data pipelines to determine which window each record
> belongs for aggregations.
> 
>  -Theo
> 
> 
>  On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao <
> w.zhaocheng...@gmail.com> wrote:
> >
> > Hi folks,
> >
> > I am new to Beam and try to play with some example, I am running
> Beam 2.14 with Direct runner to read some files (I continue generated).
> >
> > I am facing this error: Cannot output with timestamp
> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the
> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed
> skew (0 milliseconds). I searched online but still don't quite understand
> it so I am asking here for some help.
> >
> > A file has some past timestamp in it:
> > 1565958615120
> > 1565958615120
> > 1565958615121
> >
> > My code looks something like this:
> >
> >static class ReadWithEventTime extends DoFn {
> > @ProcessElement
> > public void processElement(@Element String line,
> OutputReceiver out){
> > out.outputWithTimestamp(line, new
> Instant(Long.parseLong(line)));
> > }
> > }
> >
> > public static void main(String[] args) {
> > PipelineOptions options = PipelineOptionsFactory.create();
> > Pipeline pipeline = Pipeline.create(options);
> >
> > String sourcePath = new File("files/").getPath();
> >
> > PCollection data = pipeline.apply("ReadData",
> > TextIO.read().from(sourcePath + "/test*")
> >
>  .watchForNewFiles(Duration.standardSeconds(5),
> Watch.Growth.never()));
> >
> > data.apply("ReadWithEventTime", Pa

Re: Try to understand "Output timestamps must be no earlier than the timestamp of the current input"

2019-08-20 Thread Robert Bradshaw
The original timestamps are probably being assigned in the
watchForNewFiles transform, which is also setting the watermark:

https://github.com/apache/beam/blob/release-2.15.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L668

Until https://issues.apache.org/jira/browse/BEAM-644 is resolved, it
probably makes sense to be able to customize the lag here.

On Fri, Aug 16, 2019 at 6:44 PM Chengzhi Zhao  wrote:
>
> Hi Theodore,
>
> Thanks again for your insight and help. I'd like to learn more about how we 
> got the timestamp from WindowedValue initially from +dev@beam.apache.org
>
> -Chengzhi
>
> On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu  wrote:
>>
>> Hi Chengzhi,
>>
>> I'm not completely sure where/how the timestamp is set for a ProcessContext 
>> object. Here is the error code found within the Apache Beam repo.
>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
>> which makes reference to `elem.getTimestamp()` where elem is a WindowedValue.
>>
>> I am thinking +dev@beam.apache.org can offer some insight. Would be 
>> interested to find out more myself.
>>
>> -Theo
>>
>> On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao  
>> wrote:
>>>
>>> Hi Theodore,
>>>
>>> Thanks for your reply. This is just a simple example that I tried to 
>>> understand how event time works in Beam. I could have more fields and I 
>>> would have an event time for each of record, so I tried to let Beam know 
>>> which filed is the event time to use for later windowing and computation.
>>>
>>> I think we you mentioned the probable reason sounds reasonable, I am still 
>>> trying to figure out in the error message "current input 
>>> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it.
>>>
>>> Thanks a lot for your help.
>>>
>>> -- Chengzhi
>>>
>>> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu  wrote:

 Hi Chengzhi,

 Are you simply trying to emit the timestamp onward? Why not just use 
 `out.output` with an PCollection?

 static class ReadWithEventTime extends DoFn {
 @DoFn.ProcessElement
 public void processElement(@Element String line, 
 OutputReceiver out){
 out.output(new Instant(Long.parseLong(line)));
 }
 }

 You can also output the line itself as a PCollection. If you line 
 has additional information to parse, consider a KeyValue Pair 
 https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html
  where you can emit both some parsed context of the string and the 
 timestamp.

 The probable reason why outputWithTimestamp doesn't work with older times 
 is that the timestamp emitted is used specifically for windowing and for 
 streaming type Data pipelines to determine which window each record 
 belongs for aggregations.

 -Theo


 On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao  
 wrote:
>
> Hi folks,
>
> I am new to Beam and try to play with some example, I am running Beam 
> 2.14 with Direct runner to read some files (I continue generated).
>
> I am facing this error: Cannot output with timestamp 
> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the 
> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the 
> allowed skew (0 milliseconds). I searched online but still don't quite 
> understand it so I am asking here for some help.
>
> A file has some past timestamp in it:
> 1565958615120
> 1565958615120
> 1565958615121
>
> My code looks something like this:
>
>static class ReadWithEventTime extends DoFn {
> @ProcessElement
> public void processElement(@Element String line, 
> OutputReceiver out){
> out.outputWithTimestamp(line, new Instant(Long.parseLong(line)));
> }
> }
>
> public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline pipeline = Pipeline.create(options);
>
> String sourcePath = new File("files/").getPath();
>
> PCollection data = pipeline.apply("ReadData",
> TextIO.read().from(sourcePath + "/test*")
> .watchForNewFiles(Duration.standardSeconds(5), 
> Watch.Growth.never()));
>
> data.apply("ReadWithEventTime", ParDo.of(new 
> ReadWithEventTime()));
>
> pipeline.run().waitUntilFinish();
>
> }
>
>
> I am trying to understand in the error message where "current input 
> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark 
> when I start my application? If that's the case, is there a way that I 
> can change the initial watermark?
>
> Also, I can setup `withAllowedTimestampSkew` but it looks like it has 
>

Re: Try to understand "Output timestamps must be no earlier than the timestamp of the current input"

2019-08-16 Thread Chengzhi Zhao
Hi Theodore,

Thanks again for your insight and help. I'd like to learn more about how we
got the timestamp from WindowedValue initially from +dev@beam.apache.org


-Chengzhi

On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu  wrote:

> Hi Chengzhi,
>
> I'm not completely sure where/how the timestamp is set for a
> ProcessContext object. Here is the error code found within the Apache Beam
> repo.
>
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
> which makes reference to `elem.getTimestamp()` where elem is a
> WindowedValue.
>
> I am thinking +dev@beam.apache.org  can offer some
> insight. Would be interested to find out more myself.
>
> -Theo
>
> On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao 
> wrote:
>
>> Hi Theodore,
>>
>> Thanks for your reply. This is just a simple example that I tried to
>> understand how event time works in Beam. I could have more fields and I
>> would have an event time for each of record, so I tried to let Beam know
>> which filed is the event time to use for later windowing and computation.
>>
>> I think we you mentioned the probable reason sounds reasonable, I am
>> still trying to figure out in the error message "current input
>> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it.
>>
>> Thanks a lot for your help.
>>
>> -- Chengzhi
>>
>> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu  wrote:
>>
>>> Hi Chengzhi,
>>>
>>> Are you simply trying to emit the timestamp onward? Why not just use
>>> `out.output` with an PCollection?
>>>
>>> static class ReadWithEventTime extends DoFn {
>>> @DoFn.ProcessElement
>>> public void processElement(@Element String line, 
>>> OutputReceiver out){
>>> out.output(new Instant(Long.parseLong(line)));
>>> }
>>> }
>>>
>>> You can also output the line itself as a PCollection. If you
>>> line has additional information to parse, consider a KeyValue Pair
>>> https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html
>>>  where
>>> you can emit both some parsed context of the string and the timestamp.
>>>
>>> The probable reason why outputWithTimestamp doesn't work with older
>>> times is that the timestamp emitted is used specifically for windowing and
>>> for streaming type Data pipelines to determine which window each record
>>> belongs for aggregations.
>>>
>>> -Theo
>>>
>>>
>>> On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao 
>>> wrote:
>>>
 Hi folks,

 I am new to Beam and try to play with some example, I am running Beam
 2.14 with Direct runner to read some files (I continue generated).

 I am facing this error: Cannot output with timestamp
 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the
 timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed
 skew (0 milliseconds). I searched online but still don't quite understand
 it so I am asking here for some help.

 A file has some past timestamp in it:
 1565958615120
 1565958615120
 1565958615121

 My code looks something like this:

static class ReadWithEventTime extends DoFn {
 @ProcessElement
 public void processElement(@Element String line, 
 OutputReceiver out){
 out.outputWithTimestamp(line, new Instant(Long.parseLong(line)));
 }
 }

 public static void main(String[] args) {
 PipelineOptions options = PipelineOptionsFactory.create();
 Pipeline pipeline = Pipeline.create(options);

 String sourcePath = new File("files/").getPath();

 PCollection data = pipeline.apply("ReadData",
 TextIO.read().from(sourcePath + "/test*")
 .watchForNewFiles(Duration.standardSeconds(5), 
 Watch.Growth.never()));

 data.apply("ReadWithEventTime", ParDo.of(new ReadWithEventTime()));

 pipeline.run().waitUntilFinish();

 }


 I am trying to understand in the error message where "current input
 (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark
 when I start my application? If that's the case, is there a way that I can
 change the initial watermark?

 Also, I can setup `withAllowedTimestampSkew` but it looks like it has
 been deprecated.

 Any suggestion would be appreciated. Thank you!

 Best,
 Chengzhi


>>>


Re: Try to understand "Output timestamps must be no earlier than the timestamp of the current input"

2019-08-16 Thread Theodore Siu
Hi Chengzhi,

I'm not completely sure where/how the timestamp is set for a ProcessContext
object. Here is the error code found within the Apache Beam repo.
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
which makes reference to `elem.getTimestamp()` where elem is a
WindowedValue.

I am thinking +dev@beam.apache.org  can offer some
insight. Would be interested to find out more myself.

-Theo

On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao 
wrote:

> Hi Theodore,
>
> Thanks for your reply. This is just a simple example that I tried to
> understand how event time works in Beam. I could have more fields and I
> would have an event time for each of record, so I tried to let Beam know
> which filed is the event time to use for later windowing and computation.
>
> I think we you mentioned the probable reason sounds reasonable, I am still
> trying to figure out in the error message "current input
> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it.
>
> Thanks a lot for your help.
>
> -- Chengzhi
>
> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu  wrote:
>
>> Hi Chengzhi,
>>
>> Are you simply trying to emit the timestamp onward? Why not just use
>> `out.output` with an PCollection?
>>
>> static class ReadWithEventTime extends DoFn {
>> @DoFn.ProcessElement
>> public void processElement(@Element String line, OutputReceiver 
>> out){
>> out.output(new Instant(Long.parseLong(line)));
>> }
>> }
>>
>> You can also output the line itself as a PCollection. If you line
>> has additional information to parse, consider a KeyValue Pair
>> https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html
>>  where
>> you can emit both some parsed context of the string and the timestamp.
>>
>> The probable reason why outputWithTimestamp doesn't work with older times
>> is that the timestamp emitted is used specifically for windowing and for
>> streaming type Data pipelines to determine which window each record belongs
>> for aggregations.
>>
>> -Theo
>>
>>
>> On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao 
>> wrote:
>>
>>> Hi folks,
>>>
>>> I am new to Beam and try to play with some example, I am running Beam
>>> 2.14 with Direct runner to read some files (I continue generated).
>>>
>>> I am facing this error: Cannot output with timestamp
>>> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the
>>> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed
>>> skew (0 milliseconds). I searched online but still don't quite understand
>>> it so I am asking here for some help.
>>>
>>> A file has some past timestamp in it:
>>> 1565958615120
>>> 1565958615120
>>> 1565958615121
>>>
>>> My code looks something like this:
>>>
>>>static class ReadWithEventTime extends DoFn {
>>> @ProcessElement
>>> public void processElement(@Element String line, OutputReceiver 
>>> out){
>>> out.outputWithTimestamp(line, new Instant(Long.parseLong(line)));
>>> }
>>> }
>>>
>>> public static void main(String[] args) {
>>> PipelineOptions options = PipelineOptionsFactory.create();
>>> Pipeline pipeline = Pipeline.create(options);
>>>
>>> String sourcePath = new File("files/").getPath();
>>>
>>> PCollection data = pipeline.apply("ReadData",
>>> TextIO.read().from(sourcePath + "/test*")
>>> .watchForNewFiles(Duration.standardSeconds(5), 
>>> Watch.Growth.never()));
>>>
>>> data.apply("ReadWithEventTime", ParDo.of(new ReadWithEventTime()));
>>>
>>> pipeline.run().waitUntilFinish();
>>>
>>> }
>>>
>>>
>>> I am trying to understand in the error message where "current input
>>> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark
>>> when I start my application? If that's the case, is there a way that I can
>>> change the initial watermark?
>>>
>>> Also, I can setup `withAllowedTimestampSkew` but it looks like it has
>>> been deprecated.
>>>
>>> Any suggestion would be appreciated. Thank you!
>>>
>>> Best,
>>> Chengzhi
>>>
>>>
>>