Re: [DISCUSS] FLIP-467: Introduce Generalized Watermarks

2024-07-12 Thread Jeyhun Karimov
Hi Xintong,

Thanks for your comments.

However, my major concern after reading this FLIP is that, the current
> design might be too complicated. It tries take all possible kinds of events
> (timestamp watermark, end-of-data, end-of-partition, internal watermark,
> and arbitrary user defined watermarks) into consideration, which
> complicates the design when it comes to serialization and propagation.


IMHO, this feature, the ability to send custom events across operators
> along the data streams, has the potential to become a major differentiator
> of DataStream API V2 comparing to V1. For such a feature, I don't think
> it's feasible to design everything properly at the very beginning without
> enough user feedbacks. I'd suggest to start with a smaller scope, and build
> the feature incrementally as new demands and feedbacks arise.


I think there is a misunderstanding. The scope of this FLIP is not to
encapsulate all events under generalized watermarks.
Instead, it encapsulates timestamp watermarks and enables users/runtime to
generate/send custom generalized watermarks.
I updated the FLIP accordingly [1].


Concrete use cases are usually helpful for designing such general
> mechanism. You may examine the design by trying to use it to fulfill the
> demands from the use cases. In cases you are looking for such use cases in
> addition to the event-time watermaks, here are some inputs.
> - In FLIP-309/327/329 [1-3], we proposed to detect the data freshness from
> source, and use that information for various improvements. In DataStream
> API V1, such information is carried by RecordAttributes, which is quite
> similar to the genralized watermark except that we do not allow defining
> arbitrary custom attributes.
> - In Flink CDC, there are two phases, scaning the table at certain time
> point, and cosuming the binlog from that time point. In the first phase,
> there's only +I but no +U/-U/-D in the changelog, and downstream operators
> can do many optimizations based on that information. We haven't bring those
> optimizations to the community, because that requires the runtime layer to
> understand the concept of table / sql changelogs. If we can send custom
> events accross operators, without requiring runtime to understand those
> events, the problem would be solved.
> - In processing-time temporal join, the join operator does not wait for the
> build side to complete before consuming the probe side data. This is
> because the build side is contineously updated and the join operator does
> not know when the initial build is finished. The result is that, initial
> data from the probe side that should be joined with initial data from the
> build side are missed. If we can send a signal from the build side source
> to the join operator, notifying about the completion of initial build, the
> problem would be solved. Similar to the previous case, such information
> should not be understood by the runtime layer.


- As mentioned, the scope of this FLIP is not to bring other (except time
based watermarks) events into the generalized watermark framework,
but establish a framework, support customization of generalized watermarks.
I added the above use-cases to the FLIP as motivation for a such framework.

I think we should at least include the following information:
> - non-data events / records / indicators
> - flow along the data streams
> - can be generated / handled by process functions, connectors, and the
> framework
> - may need to be aligned across parallel data streams


- Definitely agree. I think we will do these specializations once we
integrate other events into our framework (which is out of the scope of
this FLIP).

Requiring users to always implement serializations for custom watermarks
> might be a bit too heavy. Alternatively, we may consider only support
> primitive types for Watermarks, i.e., BOOLEAN, LONG, etc. If complex types
> are proved necessary in future, we can introduce STRING or BYTES so that
> users can do custom serde by themselves.


Another benefit of using primitive types is that, it simplifies the
> alignment semantics. Currently in this FLIP, users are required to
> implement a WatermarkCombiner, which is not trivil. If we only support
> limited types, we can (maybe only) provide built-in combiners for users,
> e.g., ALL / ANY for BOOLEAN, GREATEST / LEAST for LONG, etc. Combiners for
> STRING and BYTES are a bit more complicated, that's why I don't recommend
> to support them in the first version.


- Agreed. As an example, I added predefined watermarks (e.g., LONG) section
and gave an example for LONG [2] in the FLIP.

IMHO, the above answers also address your other comments. In addition, I
agree with your comments about connectors/source being able to send
generalized watermarks.
I also added support for this in the FLIP [3].

Thanks again for your feedback. Hope my replies answer your questions.

Regards,
Jeyhun

[1]

Re: [DISCUSS] FLIP-467: Introduce Generalized Watermarks

2024-07-06 Thread Xintong Song
Hi Jeyhun,

Thanks for working on this FLIP.

In general, I think it's a good idea to generalize the concept of Watermark
to not only representing the advancing of event time, but general
indicators / events / signals that need to be passed along the data
streams. So +1 for working towards this direction.

However, my major concern after reading this FLIP is that, the current
design might be too complicated. It tries take all possible kinds of events
(timestamp watermark, end-of-data, end-of-partition, internal watermark,
and arbitrary user defined watermarks) into consideration, which
complicates the design when it comes to serialization and propagation.

IMHO, this feature, the ability to send custom events across operators
along the data streams, has the potential to become a major differentiator
of DataStream API V2 comparing to V1. For such a feature, I don't think
it's feasible to design everything properly at the very beginning without
enough user feedbacks. I'd suggest to start with a smaller scope, and build
the feature incrementally as new demands and feedbacks arise.

To be specific, I'd suggest to:
1. Only focus on user facing events, thus Watermarks that are either
generated or handled by user codes (process functions and connectors).
Refactor of existing internal events  does not bring any benefit to users,
and may even unstablize existing mechanisms. We could do that incrementally
after the generalized watermark mechsnism becomes stable.
2. Start with a limited set of supported data types and propagation
strategies. We can add suport for arbitrary types and strategies later, if
proved necessary. By that time, we should be able to better understand the
use cases based on real feedbacks.
3. Try to minimize the set of concepts and apis that users need to
understand and work with, and make them simple and easy to understand. I'm
not saying we should not discuss designs of internal implementations in
this FLIP. Just it would be easier to understand the FLIP if it presents
first how users should understand and use the feature, then the key
internal designs in order to achieve that.

# Some detailed suggestions

## Use cases

Concrete use cases are usually helpful for designing such general
mechanism. You may examine the design by trying to use it to fulfill the
demands from the use cases. In cases you are looking for such use cases in
addition to the event-time watermaks, here are some inputs.
- In FLIP-309/327/329 [1-3], we proposed to detect the data freshness from
source, and use that information for various improvements. In DataStream
API V1, such information is carried by RecordAttributes, which is quite
similar to the genralized watermark except that we do not allow defining
arbitrary custom attributes.
- In Flink CDC, there are two phases, scaning the table at certain time
point, and cosuming the binlog from that time point. In the first phase,
there's only +I but no +U/-U/-D in the changelog, and downstream operators
can do many optimizations based on that information. We haven't bring those
optimizations to the community, because that requires the runtime layer to
understand the concept of table / sql changelogs. If we can send custom
events accross operators, without requiring runtime to understand those
events, the problem would be solved.
- In processing-time temporal join, the join operator does not wait for the
build side to complete before consuming the probe side data. This is
because the build side is contineously updated and the join operator does
not know when the initial build is finished. The result is that, initial
data from the probe side that should be joined with initial data from the
build side are missed. If we can send a signal from the build side source
to the join operator, notifying about the completion of initial build, the
problem would be solved. Similar to the previous case, such information
should not be understood by the runtime layer.

## Watermark Definition

The FLIP defines the new generalized Watermak as "indicators in data
streams", which is a bit too general.

I think we should at least include the following information:
- non-data events / records / indicators
- flow along the data streams
- can be generated / handled by process functions, connectors, and the
framework
- may need to be aligned across parallel data streams

## Types of Watermarks

Requiring users to always implement serializations for custom watermarks
might be a bit too heavy. Alternatively, we may consider only support
primitive types for Watermarks, i.e., BOOLEAN, LONG, etc. If complex types
are proved necessary in future, we can introduce STRING or BYTES so that
users can do custom serde by themselves.

Another benefit of using primitive types is that, it simplifies the
alignment semantics. Currently in this FLIP, users are required to
implement a WatermarkCombiner, which is not trivil. If we only support
limited types, we can (maybe only) provide built-in combiners for users,
e.g., 

[DISCUSS] FLIP-467: Introduce Generalized Watermarks

2024-07-02 Thread Jeyhun Karimov
Hi devs,

I'd like to start a discussion about FLIP-467: Introduce Generalized
Watermarks [1] .
This is another sub-FLIP of DataStream API V2 [2].


After this FLIP one can declare generalized (custom) watermarks and define
their custom propagation and alignment process. This FLIP opens new
prospects to simplify "signal"ing mechanisms inside the Flink runtime and
at the same time reveals new use-cases.

You can find more details in the FLIP [1].
Looking forward to hearing your comments, thanks!


Best regards,
Jeyhun

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-467%3A+Introduce+Generalized+Watermarks
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2