The specifics of naming diverged a bit from the FLIP during implementation but that should be fine. What matters in the end is the intention of the FLIP and that the code that is committed in the end is good and consistent in itself.

Best,
Aljoscha

On 24.05.20 05:12, Guanghui Zhang wrote:
Hi, @Aljoscha,
the function param currentTimestamp comment does not match the
recordTimestamp  "long extractTimestamp(T element, long recordTimestamp)"
on wiki.

Best,
Zhangguanghui

Dawid Wysakowicz <dwysakow...@apache.org> 于2020年5月13日周三 上午12:28写道:

Thank you for the update and sorry again for chiming in so late...

Best,

Dawid


On 12/05/2020 18:21, Aljoscha Krettek wrote:
Yes, I am also ok with a SerializableTimestampAssigner. This only
looks a bit clumsy in the API but as a user (that uses lambdas) you
should not see this. I pushed changes for this to my branch:
https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased

And yes, recordTimestamp sounds good for the TimestampAssigner. I
admit I didn't read this well enough and only saw nativeTimestamp.

Best,
Aljoscha

On 12.05.20 17:16, Dawid Wysakowicz wrote:
I have similar thoughts to @Stephan

Ad. 1 I tried something like this on your branch:

      /**
       * Adds the given {@link TimestampAssigner} to this {@link
WatermarkStrategies}. For top-level classes that implement both
Serializable and TimestampAssigner
       */
      public <TA extends TimestampAssigner<T> & Serializable>
WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner) {
          checkNotNull(timestampAssigner, "timestampAssigner");
          this.timestampAssigner = timestampAssigner;
          return this;
      }

     @FunctionalInterface
      public interface SerializableTimestampAssigner<T> extends
TimestampAssigner<T>, Serializable {
      }

       /**
        * Adds the given {@link TimestampAssigner} to this {@link
WatermarkStrategies}.
       * Helper method for serializable lambdas.
       */
      public WatermarkStrategies<T>
withTimestampAssigner(SerializableTimestampAssigner<T>
timestampAssigner) {
          checkNotNull(timestampAssigner, "timestampAssigner");
          this.timestampAssigner = timestampAssigner;
          return this;
      }

But I understand if that's too hacky. It's just a pity that we must
enforce limitations on an interface that are not strictly necessary.

Ad 2/3

I am aware the watermark assigner/timestamp extractor can be applied
further down the graph. Originally I also wanted to suggest
sourceTimestamp and SourceTimestampAssigner, but then I realized it can
be used also after the sources as you correctly pointed out. Even if the
TimestampAssigner is used after the source there might be some
native/record timestamp in the StreamRecord, that could've been
extracted by previous assigner.

Best,

Dawid

On 12/05/2020 16:47, Stephan Ewen wrote:
@Aljoscha

About (1) could we have an interface SerializableTimestampAssigner that
simply mixes in the java.io.Serializable interface? Or will this be too
clumsy?

About (3) RecordTimeStamp seems to fit both cases (in-source-record
timestamp, in stream-record timestamp).

On Tue, May 12, 2020 at 4:12 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

Definitely +1 to point 2) raised by Dawid. I'm not sure on points
1) and
3).

1) I can see the benefit of that but in reality most timestamp
assigners
will probably need to be Serializable. If you look at my (updated) POC
branch [1] you can see how a TimestampAssigner would be specified
on the
WatermarkStrategies helper class: [2]. The signature of this would
have
to be changed to something like:

public <TA extends TimestampAssigner<T> & Serializable>
WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner)

Then, however, it would not be possible for users to specify a
lambda or
anonymous inner function for the TimestampAssigner like this:

WatermarkStrategy<Long> testWmStrategy = WatermarkStrategies
                  .forGenerator(new PeriodicTestWatermarkGenerator())
                  .withTimestampAssigner((event, timestamp) -> event)
                  .build();

3) This makes sense if we only allow WatermarkStrategies on sources,
where the previous timestamp really is the "native" timestamp.
Currently, we also allow setting watermark strategies at arbitrary
points in the graph. I'm thinking we probably should only allow
that in
sources but it's not the reality currently. I'm not against
renaming it,
just voicing those thoughts.

Best,
Aljoscha


[1]

https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased
[2]


https://github.com/aljoscha/flink/blob/flink-xxx-wm-generators-rebased/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java#L81


On 12.05.20 15:48, Stephan Ewen wrote:
+1 to all of Dawid's suggestions, makes a lot of sense to me

On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz
<dwysakow...@apache.org

wrote:

Hi Aljoscha,

Sorry for adding comments during the vote, but I have some really
minor
suggestions that should not influence the voting thread imo.

1) Does it make sense to have the TimestampAssigner extend from
Flink's
Function? This implies it has to be serializable which with the
factory
pattern is not strictly necessary, right? BTW I really like that you
suggested the FunctionInterface annotation there.

2) Could we rename the IdentityTimestampAssigner to e.g.


RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner...

Personally I found the IdentityTimestampAssigner a bit misleading
as it
usually mean a no-op. Which did not click for me, as I assumed it
somehow returns the incoming record itself.

3) Could we rename the second parameter of
TimestampAssigner#extract to
e.g. recordTimestamp/nativeTimestamp? This is similar to the point
above. This parameter was also a bit confusing for me as I
thought at
times its somehow related to
TimerService#currentProcessingTimestamp()/currentWatermark() as the
whole system currentTimestamp.

Other than those three points I like the proposal and I was about to
vote +1 if it was not for those three points.

Best,

Dawid

On 11/05/2020 16:57, Jark Wu wrote:
Thanks for the explanation. I like the fatory pattern to make the
member
variables immutable and final.

So +1 to the proposal.

Best,
Jark

On Mon, 11 May 2020 at 22:01, Stephan Ewen <se...@apache.org>
wrote:

I am fine with that.

Much of the principles seem agreed upon. I understand the need to
support
code-generated extractors and we should support most of it
already (as
Aljoscha mentioned via the factories) can extend this if needed.

I think that the factory approach supports code-generated
extractors
in
a
cleaner way even than an extractor with an open/init method.


On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek
<aljos...@apache.org
wrote:

We're slightly running out of time. I would propose we vote on
the
basic
principle and remain open to later additions. This feature is
quite
important to make the new Kafka Source that is developed as
part of
FLIP-27 useful. Otherwise we would have to use the legacy
interfaces
in
the newly added connector.

I know that's a bit unorthodox but would everyone be OK with
what's
currently there and then we iterate?

Best,
Aljoscha

On 11.05.20 13:57, Aljoscha Krettek wrote:
Ah, I meant to write this in my previous email, sorry about
that.

The WatermarkStrategy, which is basically a factory for a
WatermarkGenerator is the replacement for the open() method.
This is
the
same strategy that was followed for StreamOperatorFactory,
which was
introduced to allow code generation in the Table API [1]. If
we need
metrics or other things we would add that as a parameter to the
factory
method. What do you think?

Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-11974

On 10.05.20 05:07, Jark Wu wrote:
Hi,

Regarding to the `open()/close()`, I think it's necessary for
Table&SQL to
compile the generated code.
In Table&SQL, the watermark strategy and event-timestamp is
defined
using
SQL expressions, we will
translate and generate Java code for the expressions. If we
have
`open()/close()`, we don't need lazy initialization.
Besides that, I can see a need to report some metrics, e.g. the
current
watermark, the dirty timestamps (null value), etc.
So I think a simple `open()/close()` with a context which
can get
MetricGroup is nice and not complex for the first version.

Best,
Jark



On Sun, 10 May 2020 at 00:50, Stephan Ewen <se...@apache.org>
wrote:
Thanks, Aljoscha, for picking this up.

I agree with the approach of doing the here proposed set of
changes
for
now. It already makes things simpler and adds idleness support
everywhere.

Rich functions and state always add complexity, let's do
this in a
next
step, if we have a really compelling case.


On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
aljos...@apache.org>
wrote:

Regarding the WatermarkGenerator (WG) interface itself. The
proposal
is
basically to turn emitting into a "flatMap", we give the
WatermarkGenerator a "collector" (the WatermarkOutput) and
the WG
can
decide whether to output a watermark or not and can also
mark the
output
as idle. Changing the interface to return a Watermark (as the
previous
watermark assigner interface did) would not allow that
flexibility.
Regarding checkpointing the watermark and keeping track of
the
minimum
watermark, this would be the responsibility of the
framework (or
the
KafkaConsumer in the current implementation). The
user-supplied
WG
does
not need to make sure the watermark doesn't regress.

Regarding making the WG a "rich function", I can see the
potential
benefit but I also see a lot of pitfalls. For example, how
should
the
watermark state be handled in the case of scale-in? It
could be
made
to
work in the Kafka case by attaching the state to the
partition
state
that we keep, but then we have potential backwards
compatibility
problems also for the WM state. Does the WG usually need
to keep
the
state or might it be enough if the state is transient,
i.e. if
you
have
a restart the WG would loose its histogram but it would
rebuild
it
quickly and you would get back to the same steady state as
before.
Best,
Aljoscha

On 27.04.20 12:12, David Anderson wrote:
Overall I like this proposal; thanks for bringing it
forward,
Aljoscha.

I also like the idea of making the Watermark generator a
rich
function
--
this should make it more straightforward to implement
smarter
watermark
generators. Eg, one that uses state to keep statistics
about the
actual
out-of-orderness, and uses those statistics to implement a
variable
delay.
David

On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
kklou...@gmail.com>
wrote:
Hi Aljoscha,

Thanks for opening the discussion!

I have two comments on the FLIP:
1) we could add lifecycle methods to the Generator, i.e.
open()/
close(), probably with a Context as argument: I have not
fully
thought
this through but I think that this is more aligned with the
rest
of
our rich functions. In addition, it will allow, for
example, to
initialize the Watermark value, if we decide to
checkpoint the
watermark (see [1]) (I also do not know if Table/SQL
needs to
do
anything in the open()).
2) aligned with the above, and with the case where we
want to
checkpoint the watermark in mind, I am wondering about
how we
could
implement this in the future. In the FLIP, it is
proposed to
expose
the WatermarkOutput in the methods of the
WatermarkGenerator.
Given
that there is the implicit contract that watermarks are
non-decreasing, the WatermarkOutput#emitWatermark() will
have
(I
assume) a check that will compare the last emitted WM
against
the
provided one, and emit it only if it is >=. If not, then we
risk
having the user shooting himself on the foot if he/she
accidentally
forgets the check. Given that the WatermarkGenerator and
its
caller do
not know if the watermark was finally emitted or not (the
WatermarkOutput#emitWatermark returns void), who will be
responsible
for checkpointing the WM?

Given this, why not having the methods as:

public interface WatermarkGenerator<T> {

         Watermark onEvent(T event, long eventTimestamp,
WatermarkOutput
output);

         Watermark onPeriodicEmit(WatermarkOutput output);
}

and the caller will be the one enforcing any invariants,
such
as
non-decreasing watermarks. In this way, the caller can
checkpoint
anything that is needed as it will have complete
knowledge as
to
if
the WM was emitted or not.

What do you think?

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-5601

On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <
twal...@apache.org
wrote:
Thanks for the proposal Aljoscha. This is a very useful
unification.
We
have considered this FLIP already in the interfaces for
FLIP-95
[1]
and
look forward to update to the new unified watermark
generators
once
FLIP-126 has been accepted.

Regards,
Timo

[1] https://github.com/apache/flink/pull/11692

On 20.04.20 18:10, Aljoscha Krettek wrote:
Hi Everyone!

We would like to start a discussion on "FLIP-126:
Unify (and
separate)
Watermark Assigners" [1]. This work was started by
Stephan in
an
experimental branch. I expanded on that work to
provide a PoC
for
the
changes proposed in this FLIP: [2].

Currently, we have two different flavours of Watermark
Assigners: AssignerWithPunctuatedWatermarks
and AssignerWithPeriodicWatermarks. Both of them extend
from TimestampAssigner. This means that sources that
want to
support
watermark assignment/extraction in the source need to
support
two
separate interfaces, we have two operator
implementations for
the
different flavours. Also, this makes features such as
generic
support
for idleness detection more complicated to implemented
because
we
again
have to support two types of watermark assigners.

In this FLIP we propose two things:

Unify the Watermark Assigners into one Interface
WatermarkGenerator
Separate this new interface from the TimestampAssigner
The motivation for the first is to simplify future
implementations
and
code duplication. The motivation for the second point is
again
code
deduplication, most assigners currently have to extend
from
some
base
timestamp extractor or duplicate the extraction logic, or
users
have
to
override an abstract method of the watermark assigner to
provide
the
timestamp extraction logic.

Additionally, we propose to add a generic wrapping
WatermarkGenerator
that provides idleness detection, i.e. it can mark a
stream/partition
as
idle if no data arrives after a configured timeout.

The "unify and separate" part refers to the fact that
we want
to
unify
punctuated and periodic assigners but at the same time
split
the
timestamp assigner from the watermark generator.

Please find more details in the FLIP [1]. Looking
forward to
your feedback.

Best,
Aljoscha

[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners

[2]
https://github.com/aljoscha/flink/tree/stephan-event-time








Reply via email to