The original timestamps are probably being assigned in the
watchForNewFiles transform, which is also setting the watermark:

https://github.com/apache/beam/blob/release-2.15.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L668

Until https://issues.apache.org/jira/browse/BEAM-644 is resolved, it
probably makes sense to be able to customize the lag here.

On Fri, Aug 16, 2019 at 6:44 PM Chengzhi Zhao <w.zhaocheng...@gmail.com> wrote:
>
> Hi Theodore,
>
> Thanks again for your insight and help. I'd like to learn more about how we 
> got the timestamp from WindowedValue initially from +dev@beam.apache.org
>
> -Chengzhi
>
> On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu <theo...@google.com> wrote:
>>
>> Hi Chengzhi,
>>
>> I'm not completely sure where/how the timestamp is set for a ProcessContext 
>> object. Here is the error code found within the Apache Beam repo.
>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
>> which makes reference to `elem.getTimestamp()` where elem is a WindowedValue.
>>
>> I am thinking +dev@beam.apache.org can offer some insight. Would be 
>> interested to find out more myself.
>>
>> -Theo
>>
>> On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao <w.zhaocheng...@gmail.com> 
>> wrote:
>>>
>>> Hi Theodore,
>>>
>>> Thanks for your reply. This is just a simple example that I tried to 
>>> understand how event time works in Beam. I could have more fields and I 
>>> would have an event time for each of record, so I tried to let Beam know 
>>> which filed is the event time to use for later windowing and computation.
>>>
>>> I think we you mentioned the probable reason sounds reasonable, I am still 
>>> trying to figure out in the error message "current input 
>>> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it.
>>>
>>> Thanks a lot for your help.
>>>
>>> -- Chengzhi
>>>
>>> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu <theo...@google.com> wrote:
>>>>
>>>> Hi Chengzhi,
>>>>
>>>> Are you simply trying to emit the timestamp onward? Why not just use 
>>>> `out.output` with an PCollection<Instant>?
>>>>
>>>> static class ReadWithEventTime extends DoFn<String, String> {
>>>>     @DoFn.ProcessElement
>>>>     public void processElement(@Element String line, 
>>>> OutputReceiver<Instant> out){
>>>>         out.output(new Instant(Long.parseLong(line)));
>>>>     }
>>>> }
>>>>
>>>> You can also output the line itself as a PCollection<String>. If you line 
>>>> has additional information to parse, consider a KeyValue Pair 
>>>> https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html
>>>>  where you can emit both some parsed context of the string and the 
>>>> timestamp.
>>>>
>>>> The probable reason why outputWithTimestamp doesn't work with older times 
>>>> is that the timestamp emitted is used specifically for windowing and for 
>>>> streaming type Data pipelines to determine which window each record 
>>>> belongs for aggregations.
>>>>
>>>> -Theo
>>>>
>>>>
>>>> On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao <w.zhaocheng...@gmail.com> 
>>>> wrote:
>>>>>
>>>>> Hi folks,
>>>>>
>>>>> I am new to Beam and try to play with some example, I am running Beam 
>>>>> 2.14 with Direct runner to read some files (I continue generated).
>>>>>
>>>>> I am facing this error: Cannot output with timestamp 
>>>>> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the 
>>>>> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the 
>>>>> allowed skew (0 milliseconds). I searched online but still don't quite 
>>>>> understand it so I am asking here for some help.
>>>>>
>>>>> A file has some past timestamp in it:
>>>>> 1565958615120
>>>>> 1565958615120
>>>>> 1565958615121
>>>>>
>>>>> My code looks something like this:
>>>>>
>>>>>    static class ReadWithEventTime extends DoFn<String, String> {
>>>>>     @ProcessElement
>>>>>     public void processElement(@Element String line, 
>>>>> OutputReceiver<String> out){
>>>>>         out.outputWithTimestamp(line, new Instant(Long.parseLong(line)));
>>>>>     }
>>>>> }
>>>>>
>>>>>     public static void main(String[] args) {
>>>>>         PipelineOptions options = PipelineOptionsFactory.create();
>>>>>         Pipeline pipeline = Pipeline.create(options);
>>>>>
>>>>>         String sourcePath = new File("files/").getPath();
>>>>>
>>>>>         PCollection<String> data = pipeline.apply("ReadData",
>>>>>                 TextIO.read().from(sourcePath + "/test*")
>>>>>                         .watchForNewFiles(Duration.standardSeconds(5), 
>>>>> Watch.Growth.<String>never()));
>>>>>
>>>>>         data.apply("ReadWithEventTime", ParDo.of(new 
>>>>> ReadWithEventTime()));
>>>>>
>>>>>         pipeline.run().waitUntilFinish();
>>>>>
>>>>>     }
>>>>>
>>>>>
>>>>> I am trying to understand in the error message where "current input 
>>>>> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark 
>>>>> when I start my application? If that's the case, is there a way that I 
>>>>> can change the initial watermark?
>>>>>
>>>>> Also, I can setup `withAllowedTimestampSkew` but it looks like it has 
>>>>> been deprecated.
>>>>>
>>>>> Any suggestion would be appreciated. Thank you!
>>>>>
>>>>> Best,
>>>>> Chengzhi
>>>>>

Reply via email to