Thank you, Thomas. That clears my confusions.  :)

Shen

On Tue, Apr 25, 2017 at 7:30 PM, Thomas Groh <tg...@google.com.invalid>
wrote:

> getCurrentTimestamp returns the timestamp of the current element. Both
> Bounded and Unbounded Readers have this method.
>
> For a bounded source, this is safe - the source watermark can be held to
> negative infinity while elements remain in the source and advance to
> infinity after all elements are read, and elements can be arbitrarily
> shifted forwards in time later in the pipeline (for example, via a
> "WithTimestamps" transform or a DoFn that uses "outputWithTimestamp"). It's
> not safe to output elements at negative infinity when there is a watermark
> that may drop elements, as is the case for unbounded sources.
>
> On Fri, Apr 21, 2017 at 8:44 AM, Shen Li <cs.she...@gmail.com> wrote:
>
> > Hi,
> >
> > A follow-up question. I found that the getWatermark() API is only
> available
> > for UnboundedSource. BoundedSource provides a getCurrentTimestamp() API
> > with comments "By default, returns the minimum possible timestamp", which
> > sounds like a watermark. Any reason for the difference in method names?
> >
> > Shen
> >
> > On Thu, Apr 20, 2017 at 11:46 PM, Shen Li <cs.she...@gmail.com> wrote:
> >
> > > Thanks!
> > >
> > > Shen
> > >
> > >
> > > On Thu, Apr 20, 2017 at 8:07 PM, Mingmin Xu <mingm...@gmail.com>
> wrote:
> > >
> > >> In KafkaIO, it's possible to provide customized watermark function, to
> > >> control how to advance current watermark. I'm not familiar with other
> > >> unbounded IOs, assume they should support it as getWatermark() is
> > defined
> > >> in org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.
> > >>
> > >> A quick example to hold watermark 10 seconds earlier than processing
> > time,
> > >> you can have more complex logic based on KafkaRecord content.
> > >> KafkaIO.<>read()
> > >>   .withWatermarkFn2(new SerializableFunction<
> > KafkaRecord<String,String>,
> > >> Instant>() {
> > >>     @Override
> > >>     public Instant apply(KafkaRecord<String, String> input) {
> > >>       return new Instant().minus(Duration.standardSeconds(10));
> > >>     }
> > >>   }
> > >>
> > >>
> > >> On Thu, Apr 20, 2017 at 2:44 PM, Kenneth Knowles
> <k...@google.com.invalid
> > >
> > >> wrote:
> > >>
> > >> > You want to use an existing source but just change the watermark
> > >> tracking?
> > >> > You can't do this in your pipeline right now, but you could probably
> > >> easily
> > >> > wrap a source and proxy every method except getWatermark, though I
> > have
> > >> > never tried.
> > >> >
> > >> > The general feature that might address this is discussed a little on
> > >> > https://issues.apache.org/jira/browse/BEAM-644
> > >> >
> > >> > There are also related ideas in the discussions about Splittable
> DoFn.
> > >> >
> > >> > Kenn
> > >> >
> > >> > On Thu, Apr 20, 2017 at 1:42 PM, Shen Li <cs.she...@gmail.com>
> wrote:
> > >> >
> > >> > > Hi,
> > >> > >
> > >> > > Can application developers provide classes/methods to specify how
> to
> > >> > > generate watermarks from sources, and how to aggregate watermarks
> > from
> > >> > > multiple input PCollections? Say, emit at most 1 watermark per
> > >> second, or
> > >> > > create watermarks that are 5 seconds older than the latest tuple's
> > >> > > timestamp?
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Shen
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> ----
> > >> Mingmin
> > >>
> > >
> > >
> >
>

Reply via email to