Hi,

A follow-up question. I found that the getWatermark() API is only available
for UnboundedSource. BoundedSource provides a getCurrentTimestamp() API
with comments "By default, returns the minimum possible timestamp", which
sounds like a watermark. Any reason for the difference in method names?

Shen

On Thu, Apr 20, 2017 at 11:46 PM, Shen Li <cs.she...@gmail.com> wrote:

> Thanks!
>
> Shen
>
>
> On Thu, Apr 20, 2017 at 8:07 PM, Mingmin Xu <mingm...@gmail.com> wrote:
>
>> In KafkaIO, it's possible to provide customized watermark function, to
>> control how to advance current watermark. I'm not familiar with other
>> unbounded IOs, assume they should support it as getWatermark() is defined
>> in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.
>>
>> A quick example to hold watermark 10 seconds earlier than processing time,
>> you can have more complex logic based on KafkaRecord content.
>> KafkaIO.<>read()
>>   .withWatermarkFn2(new SerializableFunction<KafkaRecord<String,String>,
>> Instant>() {
>>     @Override
>>     public Instant apply(KafkaRecord<String, String> input) {
>>       return new Instant().minus(Duration.standardSeconds(10));
>>     }
>>   }
>>
>>
>> On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles <k...@google.com.invalid>
>> wrote:
>>
>> > You want to use an existing source but just change the watermark
>> tracking?
>> > You can't do this in your pipeline right now, but you could probably
>> easily
>> > wrap a source and proxy every method except getWatermark, though I have
>> > never tried.
>> >
>> > The general feature that might address this is discussed a little on
>> > https://issues.apache.org/jira/browse/BEAM-644
>> >
>> > There are also related ideas in the discussions about Splittable DoFn.
>> >
>> > Kenn
>> >
>> > On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs.she...@gmail.com> wrote:
>> >
>> > > Hi,
>> > >
>> > > Can application developers provide classes/methods to specify how to
>> > > generate watermarks from sources, and how to aggregate watermarks from
>> > > multiple input PCollections? Say, emit at most 1 watermark per
>> second, or
>> > > create watermarks that are 5 seconds older than the latest tuple's
>> > > timestamp?
>> > >
>> > > Thanks,
>> > >
>> > > Shen
>> > >
>> >
>>
>>
>>
>> --
>> ----
>> Mingmin
>>
>
>

Reply via email to