Thanks! Shen
On Thu, Apr 20, 2017 at 8:07 PM, Mingmin Xu <mingm...@gmail.com> wrote: > In KafkaIO, it's possible to provide customized watermark function, to > control how to advance current watermark. I'm not familiar with other > unbounded IOs, assume they should support it as getWatermark() is defined > in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader. > > A quick example to hold watermark 10 seconds earlier than processing time, > you can have more complex logic based on KafkaRecord content. > KafkaIO.<>read() > .withWatermarkFn2(new SerializableFunction<KafkaRecord<String,String>, > Instant>() { > @Override > public Instant apply(KafkaRecord<String, String> input) { > return new Instant().minus(Duration.standardSeconds(10)); > } > } > > > On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles <k...@google.com.invalid> > wrote: > > > You want to use an existing source but just change the watermark > tracking? > > You can't do this in your pipeline right now, but you could probably > easily > > wrap a source and proxy every method except getWatermark, though I have > > never tried. > > > > The general feature that might address this is discussed a little on > > https://issues.apache.org/jira/browse/BEAM-644 > > > > There are also related ideas in the discussions about Splittable DoFn. > > > > Kenn > > > > On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs.she...@gmail.com> wrote: > > > > > Hi, > > > > > > Can application developers provide classes/methods to specify how to > > > generate watermarks from sources, and how to aggregate watermarks from > > > multiple input PCollections? Say, emit at most 1 watermark per second, > or > > > create watermarks that are 5 seconds older than the latest tuple's > > > timestamp? > > > > > > Thanks, > > > > > > Shen > > > > > > > > > -- > ---- > Mingmin >