Thanks!

Shen

On Thu, Apr 20, 2017 at 8:07 PM, Mingmin Xu <mingm...@gmail.com> wrote:

> In KafkaIO, it's possible to provide customized watermark function, to
> control how to advance current watermark. I'm not familiar with other
> unbounded IOs, assume they should support it as getWatermark() is defined
> in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.
>
> A quick example to hold watermark 10 seconds earlier than processing time,
> you can have more complex logic based on KafkaRecord content.
> KafkaIO.<>read()
>   .withWatermarkFn2(new SerializableFunction<KafkaRecord<String,String>,
> Instant>() {
>     @Override
>     public Instant apply(KafkaRecord<String, String> input) {
>       return new Instant().minus(Duration.standardSeconds(10));
>     }
>   }
>
>
> On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles <k...@google.com.invalid>
> wrote:
>
> > You want to use an existing source but just change the watermark
> tracking?
> > You can't do this in your pipeline right now, but you could probably
> easily
> > wrap a source and proxy every method except getWatermark, though I have
> > never tried.
> >
> > The general feature that might address this is discussed a little on
> > https://issues.apache.org/jira/browse/BEAM-644
> >
> > There are also related ideas in the discussions about Splittable DoFn.
> >
> > Kenn
> >
> > On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs.she...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > Can application developers provide classes/methods to specify how to
> > > generate watermarks from sources, and how to aggregate watermarks from
> > > multiple input PCollections? Say, emit at most 1 watermark per second,
> or
> > > create watermarks that are 5 seconds older than the latest tuple's
> > > timestamp?
> > >
> > > Thanks,
> > >
> > > Shen
> > >
> >
>
>
>
> --
> ----
> Mingmin
>

Reply via email to