Thank you, Thomas. That clears my confusions. :) Shen
On Tue, Apr 25, 2017 at 7:30 PM, Thomas Groh <tg...@google.com.invalid> wrote: > getCurrentTimestamp returns the timestamp of the current element. Both > Bounded and Unbounded Readers have this method. > > For a bounded source, this is safe - the source watermark can be held to > negative infinity while elements remain in the source and advance to > infinity after all elements are read, and elements can be arbitrarily > shifted forwards in time later in the pipeline (for example, via a > "WithTimestamps" transform or a DoFn that uses "outputWithTimestamp"). It's > not safe to output elements at negative infinity when there is a watermark > that may drop elements, as is the case for unbounded sources. > > On Fri, Apr 21, 2017 at 8:44 AM, Shen Li <cs.she...@gmail.com> wrote: > > > Hi, > > > > A follow-up question. I found that the getWatermark() API is only > available > > for UnboundedSource. BoundedSource provides a getCurrentTimestamp() API > > with comments "By default, returns the minimum possible timestamp", which > > sounds like a watermark. Any reason for the difference in method names? > > > > Shen > > > > On Thu, Apr 20, 2017 at 11:46 PM, Shen Li <cs.she...@gmail.com> wrote: > > > > > Thanks! > > > > > > Shen > > > > > > > > > On Thu, Apr 20, 2017 at 8:07 PM, Mingmin Xu <mingm...@gmail.com> > wrote: > > > > > >> In KafkaIO, it's possible to provide customized watermark function, to > > >> control how to advance current watermark. I'm not familiar with other > > >> unbounded IOs, assume they should support it as getWatermark() is > > defined > > >> in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader. > > >> > > >> A quick example to hold watermark 10 seconds earlier than processing > > time, > > >> you can have more complex logic based on KafkaRecord content. > > >> KafkaIO.<>read() > > >> .withWatermarkFn2(new SerializableFunction< > > KafkaRecord<String,String>, > > >> Instant>() { > > >> @Override > > >> public Instant apply(KafkaRecord<String, String> input) { > > >> return new Instant().minus(Duration.standardSeconds(10)); > > >> } > > >> } > > >> > > >> > > >> On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles > <k...@google.com.invalid > > > > > >> wrote: > > >> > > >> > You want to use an existing source but just change the watermark > > >> tracking? > > >> > You can't do this in your pipeline right now, but you could probably > > >> easily > > >> > wrap a source and proxy every method except getWatermark, though I > > have > > >> > never tried. > > >> > > > >> > The general feature that might address this is discussed a little on > > >> > https://issues.apache.org/jira/browse/BEAM-644 > > >> > > > >> > There are also related ideas in the discussions about Splittable > DoFn. > > >> > > > >> > Kenn > > >> > > > >> > On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs.she...@gmail.com> > wrote: > > >> > > > >> > > Hi, > > >> > > > > >> > > Can application developers provide classes/methods to specify how > to > > >> > > generate watermarks from sources, and how to aggregate watermarks > > from > > >> > > multiple input PCollections? Say, emit at most 1 watermark per > > >> second, or > > >> > > create watermarks that are 5 seconds older than the latest tuple's > > >> > > timestamp? > > >> > > > > >> > > Thanks, > > >> > > > > >> > > Shen > > >> > > > > >> > > > >> > > >> > > >> > > >> -- > > >> ---- > > >> Mingmin > > >> > > > > > > > > >