Hi Chengzhi,

I'm not completely sure where/how the timestamp is set for a ProcessContext
object. Here is the error code found within the Apache Beam repo.
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
which makes reference to `elem.getTimestamp()` where elem is a
WindowedValue.

I am thinking +dev@beam.apache.org <dev@beam.apache.org> can offer some
insight. Would be interested to find out more myself.

-Theo

On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao <w.zhaocheng...@gmail.com>
wrote:

> Hi Theodore,
>
> Thanks for your reply. This is just a simple example that I tried to
> understand how event time works in Beam. I could have more fields and I
> would have an event time for each of record, so I tried to let Beam know
> which filed is the event time to use for later windowing and computation.
>
> I think we you mentioned the probable reason sounds reasonable, I am still
> trying to figure out in the error message "current input
> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it.
>
> Thanks a lot for your help.
>
> -- Chengzhi
>
> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu <theo...@google.com> wrote:
>
>> Hi Chengzhi,
>>
>> Are you simply trying to emit the timestamp onward? Why not just use
>> `out.output` with an PCollection<Instant>?
>>
>> static class ReadWithEventTime extends DoFn<String, String> {
>>     @DoFn.ProcessElement
>>     public void processElement(@Element String line, OutputReceiver<Instant> 
>> out){
>>         out.output(new Instant(Long.parseLong(line)));
>>     }
>> }
>>
>> You can also output the line itself as a PCollection<String>. If you line
>> has additional information to parse, consider a KeyValue Pair
>> https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html
>>  where
>> you can emit both some parsed context of the string and the timestamp.
>>
>> The probable reason why outputWithTimestamp doesn't work with older times
>> is that the timestamp emitted is used specifically for windowing and for
>> streaming type Data pipelines to determine which window each record belongs
>> for aggregations.
>>
>> -Theo
>>
>>
>> On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao <w.zhaocheng...@gmail.com>
>> wrote:
>>
>>> Hi folks,
>>>
>>> I am new to Beam and try to play with some example, I am running Beam
>>> 2.14 with Direct runner to read some files (I continue generated).
>>>
>>> I am facing this error: Cannot output with timestamp
>>> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the
>>> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed
>>> skew (0 milliseconds). I searched online but still don't quite understand
>>> it so I am asking here for some help.
>>>
>>> A file has some past timestamp in it:
>>> 1565958615120
>>> 1565958615120
>>> 1565958615121
>>>
>>> My code looks something like this:
>>>
>>>    static class ReadWithEventTime extends DoFn<String, String> {
>>>     @ProcessElement
>>>     public void processElement(@Element String line, OutputReceiver<String> 
>>> out){
>>>         out.outputWithTimestamp(line, new Instant(Long.parseLong(line)));
>>>     }
>>> }
>>>
>>>     public static void main(String[] args) {
>>>         PipelineOptions options = PipelineOptionsFactory.create();
>>>         Pipeline pipeline = Pipeline.create(options);
>>>
>>>         String sourcePath = new File("files/").getPath();
>>>
>>>         PCollection<String> data = pipeline.apply("ReadData",
>>>                 TextIO.read().from(sourcePath + "/test*")
>>>                         .watchForNewFiles(Duration.standardSeconds(5), 
>>> Watch.Growth.<String>never()));
>>>
>>>         data.apply("ReadWithEventTime", ParDo.of(new ReadWithEventTime()));
>>>
>>>         pipeline.run().waitUntilFinish();
>>>
>>>     }
>>>
>>>
>>> I am trying to understand in the error message where "current input
>>> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark
>>> when I start my application? If that's the case, is there a way that I can
>>> change the initial watermark?
>>>
>>> Also, I can setup `withAllowedTimestampSkew` but it looks like it has
>>> been deprecated.
>>>
>>> Any suggestion would be appreciated. Thank you!
>>>
>>> Best,
>>> Chengzhi
>>>
>>>
>>

Reply via email to