However, my major concern after reading this FLIP is that, the current
> design might be too complicated. It tries take all possible kinds of events
> (timestamp watermark, end-of-data, end-of-partition, internal watermark,
> and arbitrary user defined watermarks) into consideration, which
> complicates the design when it comes to serialization and propagation.

IMHO, this feature, the ability to send custom events across operators
> along the data streams, has the potential to become a major differentiator
> of DataStream API V2 comparing to V1. For such a feature, I don't think
> it's feasible to design everything properly at the very beginning without
> enough user feedbacks. I'd suggest to start with a smaller scope, and build
> the feature incrementally as new demands and feedbacks arise.

I think there is a misunderstanding. The scope of this FLIP is not to
encapsulate all events under generalized watermarks.
Instead, it encapsulates timestamp watermarks and enables users/runtime to
generate/send custom generalized watermarks.
I updated the FLIP accordingly [1].

Concrete use cases are usually helpful for designing such general
> mechanism. You may examine the design by trying to use it to fulfill the
> demands from the use cases. In cases you are looking for such use cases in
> addition to the event-time watermaks, here are some inputs.
> - In FLIP-309/327/329 [1-3], we proposed to detect the data freshness from
> source, and use that information for various improvements. In DataStream
> API V1, such information is carried by RecordAttributes, which is quite
> similar to the genralized watermark except that we do not allow defining
> arbitrary custom attributes.
> - In Flink CDC, there are two phases, scaning the table at certain time
> point, and cosuming the binlog from that time point. In the first phase,
> there's only +I but no +U/-U/-D in the changelog, and downstream operators
> can do many optimizations based on that information. We haven't bring those
> optimizations to the community, because that requires the runtime layer to
> understand the concept of table / sql changelogs. If we can send custom
> events accross operators, without requiring runtime to understand those
> events, the problem would be solved.
> - In processing-time temporal join, the join operator does not wait for the
> build side to complete before consuming the probe side data. This is
> because the build side is contineously updated and the join operator does
> not know when the initial build is finished. The result is that, initial
> data from the probe side that should be joined with initial data from the
> build side are missed. If we can send a signal from the build side source
> to the join operator, notifying about the completion of initial build, the
> problem would be solved. Similar to the previous case, such information
> should not be understood by the runtime layer.

- As mentioned, the scope of this FLIP is not to bring other (except time
based watermarks) events into the generalized watermark framework,
but establish a framework, support customization of generalized watermarks.
I added the above use-cases to the FLIP as motivation for a such framework.

I think we should at least include the following information:
> - non-data events / records / indicators
> - flow along the data streams
> - can be generated / handled by process functions, connectors, and the
> framework
> - may need to be aligned across parallel data streams

- Definitely agree. I think we will do these specializations once we
integrate other events into our framework (which is out of the scope of
this FLIP).

Requiring users to always implement serializations for custom watermarks
> might be a bit too heavy. Alternatively, we may consider only support
> primitive types for Watermarks, i.e., BOOLEAN, LONG, etc. If complex types
> are proved necessary in future, we can introduce STRING or BYTES so that
> users can do custom serde by themselves.

Another benefit of using primitive types is that, it simplifies the
> alignment semantics. Currently in this FLIP, users are required to
> implement a WatermarkCombiner, which is not trivil. If we only support
> limited types, we can (maybe only) provide built-in combiners for users,
> e.g., ALL / ANY for BOOLEAN, GREATEST / LEAST for LONG, etc. Combiners for
> STRING and BYTES are a bit more complicated, that's why I don't recommend
> to support them in the first version.

- Agreed. As an example, I added predefined watermarks (e.g., LONG) section
and gave an example for LONG [2] in the FLIP.

IMHO, the above answers also address your other comments. In addition, I
agree with your comments about connectors/source being able to send
generalized watermarks.
I also added support for this in the FLIP [3].

Thanks again for your feedback. Hope my replies answer your questions.



