getCurrentTimestamp returns the timestamp of the current element. Both Bounded and Unbounded Readers have this method.
For a bounded source, this is safe - the source watermark can be held to negative infinity while elements remain in the source and advance to infinity after all elements are read, and elements can be arbitrarily shifted forwards in time later in the pipeline (for example, via a "WithTimestamps" transform or a DoFn that uses "outputWithTimestamp"). It's not safe to output elements at negative infinity when there is a watermark that may drop elements, as is the case for unbounded sources. On Fri, Apr 21, 2017 at 8:44 AM, Shen Li <cs.she...@gmail.com> wrote: > Hi, > > A follow-up question. I found that the getWatermark() API is only available > for UnboundedSource. BoundedSource provides a getCurrentTimestamp() API > with comments "By default, returns the minimum possible timestamp", which > sounds like a watermark. Any reason for the difference in method names? > > Shen > > On Thu, Apr 20, 2017 at 11:46 PM, Shen Li <cs.she...@gmail.com> wrote: > > > Thanks! > > > > Shen > > > > > > On Thu, Apr 20, 2017 at 8:07 PM, Mingmin Xu <mingm...@gmail.com> wrote: > > > >> In KafkaIO, it's possible to provide customized watermark function, to > >> control how to advance current watermark. I'm not familiar with other > >> unbounded IOs, assume they should support it as getWatermark() is > defined > >> in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader. > >> > >> A quick example to hold watermark 10 seconds earlier than processing > time, > >> you can have more complex logic based on KafkaRecord content. > >> KafkaIO.<>read() > >> .withWatermarkFn2(new SerializableFunction< > KafkaRecord<String,String>, > >> Instant>() { > >> @Override > >> public Instant apply(KafkaRecord<String, String> input) { > >> return new Instant().minus(Duration.standardSeconds(10)); > >> } > >> } > >> > >> > >> On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles <k...@google.com.invalid > > > >> wrote: > >> > >> > You want to use an existing source but just change the watermark > >> tracking? > >> > You can't do this in your pipeline right now, but you could probably > >> easily > >> > wrap a source and proxy every method except getWatermark, though I > have > >> > never tried. > >> > > >> > The general feature that might address this is discussed a little on > >> > https://issues.apache.org/jira/browse/BEAM-644 > >> > > >> > There are also related ideas in the discussions about Splittable DoFn. > >> > > >> > Kenn > >> > > >> > On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs.she...@gmail.com> wrote: > >> > > >> > > Hi, > >> > > > >> > > Can application developers provide classes/methods to specify how to > >> > > generate watermarks from sources, and how to aggregate watermarks > from > >> > > multiple input PCollections? Say, emit at most 1 watermark per > >> second, or > >> > > create watermarks that are 5 seconds older than the latest tuple's > >> > > timestamp? > >> > > > >> > > Thanks, > >> > > > >> > > Shen > >> > > > >> > > >> > >> > >> > >> -- > >> ---- > >> Mingmin > >> > > > > >