In KafkaIO, it's possible to provide customized watermark function, to
control how to advance current watermark. I'm not familiar with other
unbounded IOs, assume they should support it as getWatermark() is defined
in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.

A quick example to hold watermark 10 seconds earlier than processing time,
you can have more complex logic based on KafkaRecord content.
KafkaIO.<>read()
  .withWatermarkFn2(new SerializableFunction<KafkaRecord<String,String>,
Instant>() {
    @Override
    public Instant apply(KafkaRecord<String, String> input) {
      return new Instant().minus(Duration.standardSeconds(10));
    }
  }


On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles <k...@google.com.invalid>
wrote:

> You want to use an existing source but just change the watermark tracking?
> You can't do this in your pipeline right now, but you could probably easily
> wrap a source and proxy every method except getWatermark, though I have
> never tried.
>
> The general feature that might address this is discussed a little on
> https://issues.apache.org/jira/browse/BEAM-644
>
> There are also related ideas in the discussions about Splittable DoFn.
>
> Kenn
>
> On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs.she...@gmail.com> wrote:
>
> > Hi,
> >
> > Can application developers provide classes/methods to specify how to
> > generate watermarks from sources, and how to aggregate watermarks from
> > multiple input PCollections? Say, emit at most 1 watermark per second, or
> > create watermarks that are 5 seconds older than the latest tuple's
> > timestamp?
> >
> > Thanks,
> >
> > Shen
> >
>



-- 
----
Mingmin

Reply via email to