Re: [DISCUSS] FLIP-467: Introduce Generalized Watermarks
Hi Xintong, Thanks for your comments. However, my major concern after reading this FLIP is that, the current > design might be too complicated. It tries take all possible kinds of events > (timestamp watermark, end-of-data, end-of-partition, internal watermark, > and arbitrary user defined watermarks) into consideration, which > complicates the design when it comes to serialization and propagation. IMHO, this feature, the ability to send custom events across operators > along the data streams, has the potential to become a major differentiator > of DataStream API V2 comparing to V1. For such a feature, I don't think > it's feasible to design everything properly at the very beginning without > enough user feedbacks. I'd suggest to start with a smaller scope, and build > the feature incrementally as new demands and feedbacks arise. I think there is a misunderstanding. The scope of this FLIP is not to encapsulate all events under generalized watermarks. Instead, it encapsulates timestamp watermarks and enables users/runtime to generate/send custom generalized watermarks. I updated the FLIP accordingly [1]. Concrete use cases are usually helpful for designing such general > mechanism. You may examine the design by trying to use it to fulfill the > demands from the use cases. In cases you are looking for such use cases in > addition to the event-time watermaks, here are some inputs. > - In FLIP-309/327/329 [1-3], we proposed to detect the data freshness from > source, and use that information for various improvements. In DataStream > API V1, such information is carried by RecordAttributes, which is quite > similar to the genralized watermark except that we do not allow defining > arbitrary custom attributes. > - In Flink CDC, there are two phases, scaning the table at certain time > point, and cosuming the binlog from that time point. In the first phase, > there's only +I but no +U/-U/-D in the changelog, and downstream operators > can do many optimizations based on that information. We haven't bring those > optimizations to the community, because that requires the runtime layer to > understand the concept of table / sql changelogs. If we can send custom > events accross operators, without requiring runtime to understand those > events, the problem would be solved. > - In processing-time temporal join, the join operator does not wait for the > build side to complete before consuming the probe side data. This is > because the build side is contineously updated and the join operator does > not know when the initial build is finished. The result is that, initial > data from the probe side that should be joined with initial data from the > build side are missed. If we can send a signal from the build side source > to the join operator, notifying about the completion of initial build, the > problem would be solved. Similar to the previous case, such information > should not be understood by the runtime layer. - As mentioned, the scope of this FLIP is not to bring other (except time based watermarks) events into the generalized watermark framework, but establish a framework, support customization of generalized watermarks. I added the above use-cases to the FLIP as motivation for a such framework. I think we should at least include the following information: > - non-data events / records / indicators > - flow along the data streams > - can be generated / handled by process functions, connectors, and the > framework > - may need to be aligned across parallel data streams - Definitely agree. I think we will do these specializations once we integrate other events into our framework (which is out of the scope of this FLIP). Requiring users to always implement serializations for custom watermarks > might be a bit too heavy. Alternatively, we may consider only support > primitive types for Watermarks, i.e., BOOLEAN, LONG, etc. If complex types > are proved necessary in future, we can introduce STRING or BYTES so that > users can do custom serde by themselves. Another benefit of using primitive types is that, it simplifies the > alignment semantics. Currently in this FLIP, users are required to > implement a WatermarkCombiner, which is not trivil. If we only support > limited types, we can (maybe only) provide built-in combiners for users, > e.g., ALL / ANY for BOOLEAN, GREATEST / LEAST for LONG, etc. Combiners for > STRING and BYTES are a bit more complicated, that's why I don't recommend > to support them in the first version. - Agreed. As an example, I added predefined watermarks (e.g., LONG) section and gave an example for LONG [2] in the FLIP. IMHO, the above answers also address your other comments. In addition, I agree with your comments about connectors/source being able to send generalized watermarks. I also added support for this in the FLIP [3]. Thanks again for your feedback. Hope my replies answer your questions. Regards, Jeyhun [1]
Re: [DISCUSS] FLIP-467: Introduce Generalized Watermarks
Hi Jeyhun, Thanks for working on this FLIP. In general, I think it's a good idea to generalize the concept of Watermark to not only representing the advancing of event time, but general indicators / events / signals that need to be passed along the data streams. So +1 for working towards this direction. However, my major concern after reading this FLIP is that, the current design might be too complicated. It tries take all possible kinds of events (timestamp watermark, end-of-data, end-of-partition, internal watermark, and arbitrary user defined watermarks) into consideration, which complicates the design when it comes to serialization and propagation. IMHO, this feature, the ability to send custom events across operators along the data streams, has the potential to become a major differentiator of DataStream API V2 comparing to V1. For such a feature, I don't think it's feasible to design everything properly at the very beginning without enough user feedbacks. I'd suggest to start with a smaller scope, and build the feature incrementally as new demands and feedbacks arise. To be specific, I'd suggest to: 1. Only focus on user facing events, thus Watermarks that are either generated or handled by user codes (process functions and connectors). Refactor of existing internal events does not bring any benefit to users, and may even unstablize existing mechanisms. We could do that incrementally after the generalized watermark mechsnism becomes stable. 2. Start with a limited set of supported data types and propagation strategies. We can add suport for arbitrary types and strategies later, if proved necessary. By that time, we should be able to better understand the use cases based on real feedbacks. 3. Try to minimize the set of concepts and apis that users need to understand and work with, and make them simple and easy to understand. I'm not saying we should not discuss designs of internal implementations in this FLIP. Just it would be easier to understand the FLIP if it presents first how users should understand and use the feature, then the key internal designs in order to achieve that. # Some detailed suggestions ## Use cases Concrete use cases are usually helpful for designing such general mechanism. You may examine the design by trying to use it to fulfill the demands from the use cases. In cases you are looking for such use cases in addition to the event-time watermaks, here are some inputs. - In FLIP-309/327/329 [1-3], we proposed to detect the data freshness from source, and use that information for various improvements. In DataStream API V1, such information is carried by RecordAttributes, which is quite similar to the genralized watermark except that we do not allow defining arbitrary custom attributes. - In Flink CDC, there are two phases, scaning the table at certain time point, and cosuming the binlog from that time point. In the first phase, there's only +I but no +U/-U/-D in the changelog, and downstream operators can do many optimizations based on that information. We haven't bring those optimizations to the community, because that requires the runtime layer to understand the concept of table / sql changelogs. If we can send custom events accross operators, without requiring runtime to understand those events, the problem would be solved. - In processing-time temporal join, the join operator does not wait for the build side to complete before consuming the probe side data. This is because the build side is contineously updated and the join operator does not know when the initial build is finished. The result is that, initial data from the probe side that should be joined with initial data from the build side are missed. If we can send a signal from the build side source to the join operator, notifying about the completion of initial build, the problem would be solved. Similar to the previous case, such information should not be understood by the runtime layer. ## Watermark Definition The FLIP defines the new generalized Watermak as "indicators in data streams", which is a bit too general. I think we should at least include the following information: - non-data events / records / indicators - flow along the data streams - can be generated / handled by process functions, connectors, and the framework - may need to be aligned across parallel data streams ## Types of Watermarks Requiring users to always implement serializations for custom watermarks might be a bit too heavy. Alternatively, we may consider only support primitive types for Watermarks, i.e., BOOLEAN, LONG, etc. If complex types are proved necessary in future, we can introduce STRING or BYTES so that users can do custom serde by themselves. Another benefit of using primitive types is that, it simplifies the alignment semantics. Currently in this FLIP, users are required to implement a WatermarkCombiner, which is not trivil. If we only support limited types, we can (maybe only) provide built-in combiners for users, e.g.,
[DISCUSS] FLIP-467: Introduce Generalized Watermarks
Hi devs, I'd like to start a discussion about FLIP-467: Introduce Generalized Watermarks [1] . This is another sub-FLIP of DataStream API V2 [2]. After this FLIP one can declare generalized (custom) watermarks and define their custom propagation and alignment process. This FLIP opens new prospects to simplify "signal"ing mechanisms inside the Flink runtime and at the same time reveals new use-cases. You can find more details in the FLIP [1]. Looking forward to hearing your comments, thanks! Best regards, Jeyhun [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-467%3A+Introduce+Generalized+Watermarks [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-408%3A+%5BUmbrella%5D+Introduce+DataStream+API+V2