Hi, A follow-up question. I found that the getWatermark() API is only available for UnboundedSource. BoundedSource provides a getCurrentTimestamp() API with comments "By default, returns the minimum possible timestamp", which sounds like a watermark. Any reason for the difference in method names?
Shen On Thu, Apr 20, 2017 at 11:46 PM, Shen Li <cs.she...@gmail.com> wrote: > Thanks! > > Shen > > > On Thu, Apr 20, 2017 at 8:07 PM, Mingmin Xu <mingm...@gmail.com> wrote: > >> In KafkaIO, it's possible to provide customized watermark function, to >> control how to advance current watermark. I'm not familiar with other >> unbounded IOs, assume they should support it as getWatermark() is defined >> in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader. >> >> A quick example to hold watermark 10 seconds earlier than processing time, >> you can have more complex logic based on KafkaRecord content. >> KafkaIO.<>read() >> .withWatermarkFn2(new SerializableFunction<KafkaRecord<String,String>, >> Instant>() { >> @Override >> public Instant apply(KafkaRecord<String, String> input) { >> return new Instant().minus(Duration.standardSeconds(10)); >> } >> } >> >> >> On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles <k...@google.com.invalid> >> wrote: >> >> > You want to use an existing source but just change the watermark >> tracking? >> > You can't do this in your pipeline right now, but you could probably >> easily >> > wrap a source and proxy every method except getWatermark, though I have >> > never tried. >> > >> > The general feature that might address this is discussed a little on >> > https://issues.apache.org/jira/browse/BEAM-644 >> > >> > There are also related ideas in the discussions about Splittable DoFn. >> > >> > Kenn >> > >> > On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs.she...@gmail.com> wrote: >> > >> > > Hi, >> > > >> > > Can application developers provide classes/methods to specify how to >> > > generate watermarks from sources, and how to aggregate watermarks from >> > > multiple input PCollections? Say, emit at most 1 watermark per >> second, or >> > > create watermarks that are 5 seconds older than the latest tuple's >> > > timestamp? >> > > >> > > Thanks, >> > > >> > > Shen >> > > >> > >> >> >> >> -- >> ---- >> Mingmin >> > >