Hi all, Thanks for all the feedback and suggestions so far. If there are no further comments, we will open the voting thread tomorrow, Aug 6, 2024.
Best, Xia Lincoln Lee <[email protected]> 于2024年8月1日周四 22:18写道: > Hi Xia, > > Thanks for your updates! Looks good to me. > > Best, > Lincoln Lee > > > Xia Sun <[email protected]> 于2024年8月1日周四 11:15写道: > > > Hi Lincoln, > > > > Thanks for your detailed explanation. I understand your concern. > > Introducing configuration with redundant semantics can indeed confuse > > users, and the engine should minimize user exposure to these details. > Based > > on this premise, while also ensuring that users can choose to enable the > > broadcast hash join optimization during either the compile-time or > runtime, > > I think we can introduce a new configuration > > `table.optimizer.adaptive-broadcast-join.strategy`, and reuse the > existing > > configuration `table.optimizer.join.broadcast-threshold` as a unified > > threshold for determining broadcast hash join optimization. The > > `table.optimizer.adaptive-broadcast-join.strategy` configuration would be > > of an enumeration type with three options: > > > > AUTO: Flink will autonomously select the optimal timing for the > > optimization. > > RUNTIME_ONLY: The broadcast hash join optimization will only be performed > > at runtime. > > NONE: The broadcast hash join optimization will only be performed at > > compile phase. > > And AUTO will be the default option. > > > > I have also updated this information in FLIP, PTAL. > > > > Best, > > Xia > > > > Lincoln Lee <[email protected]> 于2024年7月30日周二 23:39写道: > > > > > Thanks Xia for your explanation! > > > > > > I can understand your concern, but considering the design of this FLIP, > > > which already covers compile-time inaccurate optimization for runtime > > > de-optimization, is it necessary to make the user manually turn off > > > 'table.optimizer.join.broadcast-threshold' and set the new > > > 'table.optimizer.adaptive.join.broadcast-threshold' again? Another > option > > > is that users only need to focus on the existing broadcast size > > threshold, > > > and accept the reality that 100% accurate optimization cannot be done > > > at compile time, and adopt the new capability of dynamic optimization > at > > > runtime, and ultimately, users will trust that flink will always > optimize > > > accurately, and from this point of view, I would prefer a generic > > parameter > > > 'table.optimizer. adaptive-optimization.enabled', which would allow for > > > more dynamic optimization in the future, not limited to broadcast join > > > scenarios and will not continuously bring more new options, WDYT? > > > > > > > > > Best, > > > Lincoln Lee > > > > > > > > > Xia Sun <[email protected]> 于2024年7月30日周二 11:27写道: > > > > > > > Hi Lincoln, > > > > > > > > Thank you for your input and participation in the discussion! > > > > > > > > Compared to introducing the 'table.optimizer.adaptive-join.enabled' > > > option, > > > > introducing the "table.optimizer.adaptive.join.broadcast-threshold" > can > > > > also cover the need to disable static broadcast optimization while > only > > > > enabling dynamic broadcast optimization. From this perspective, > > > introducing > > > > a new threshold configuration might be more appropriate. What do you > > > think? > > > > > > > > Best, > > > > Xia > > > > > > > > Lincoln Lee <[email protected]> 于2024年7月29日周一 23:12写道: > > > > > > > > > +1 for this useful optimization! > > > > > > > > > > I have a question about the new optoin, do we really need two > > broadcast > > > > > join thresholds? IIUC, this adaptive broadcast join is a complement > > to > > > > > compile-time optimization, there is no need for the user to > configure > > > two > > > > > different thresholds (not the off represented by -1), so we just > want > > > to > > > > > control the adaptive optimization itself, should we provide a > > > > configuration > > > > > option like 'table.optimizer.adaptive-join.enabled' or a more > general > > > one > > > > > 'table.optimizer.adaptive-optimization.enabled' for such related > > > > > optimizations? > > > > > > > > > > > > > > > Best, > > > > > Lincoln Lee > > > > > > > > > > > > > > > Ron Liu <[email protected]> 于2024年7月26日周五 11:59写道: > > > > > > > > > > > Hi, Xia > > > > > > > > > > > > Thanks for your reply. It looks good to me. > > > > > > > > > > > > > > > > > > Best, > > > > > > Ron > > > > > > > > > > > > Xia Sun <[email protected]> 于2024年7月26日周五 10:49写道: > > > > > > > > > > > > > Hi Ron, > > > > > > > > > > > > > > Thanks for your feedback! > > > > > > > > > > > > > > -> creation of the join operators until runtime > > > > > > > > > > > > > > > > > > > > > That means when creating the AdaptiveJoinOperatorFactory, we > will > > > not > > > > > > > immediately create the JoinOperator. Instead, we only pass in > the > > > > > > necessary > > > > > > > parameters for creating the JoinOperator. The appropriate > > > > JoinOperator > > > > > > will > > > > > > > be created during the StreamGraphOptimizationStrategy > > optimization > > > > > phase. > > > > > > > > > > > > > > You mentioned that the runtime's visibility into the table > > planner > > > is > > > > > > > indeed an issue. It includes two aspects, > > > > > > > (1) we plan to place both implementations of the > > > > > > > AdaptiveBroadcastJoinOptimizationStrategy and > > > > > AdaptiveJoinOperatorFactory > > > > > > > in the table layer. During the runtime phase, we will obtain > the > > > > > > > AdaptiveBroadcastJoinOptimizationStrategy through class > loading. > > > > > > Therefore, > > > > > > > the flink-runtime does not need to be aware of the table > layer's > > > > > > > implementation. > > > > > > > (2) Since the dynamic codegen in the > AdaptiveJoinOperatorFactory > > > > needs > > > > > to > > > > > > > be aware of the table planner, we will consider placing the > > > > > > > AdaptiveJoinOperatorFactory in the table planner module as > well. > > > > > > > > > > > > > > > > > > > > > -> When did you configure these optimization strategies > > uniformly > > > > into > > > > > > > > > `execution.batch.adaptive.stream-graph-optimization.strategies` > > > > > > > > > > > > > > > > > > > > > Thank you for pointing out this issue. When there are multiple > > > > > > > StreamGraphOptimizationStrategies, the optimization order at > the > > > > > runtime > > > > > > > phase will strictly follow the order specified in the > > configuration > > > > > > option > > > > > > > > `execution.batch.adaptive.stream-graph-optimization.strategies`. > > > > > > Therefore, > > > > > > > it is necessary to have a unified configuration during the sql > > > > planner > > > > > > > phase to ensure the correct optimization order. Currently, we > are > > > > > > > considering performing this unified configuration in > > > > > > > BatchPlanner#afterTranslation(). > > > > > > > > > > > > > > For simplicity, as long as the adaptive broadcast join/skewed > > join > > > > > > > optimization features are enabled (e.g., > > > > > > > `table.optimizer.adaptive.join.broadcast-threshold` is not -1), > > the > > > > > > > corresponding strategy will be configured. This optimization is > > > > > > independent > > > > > > > of the specific SQL query, although it might not produce any > > actual > > > > > > effect. > > > > > > > > > > > > > > Best, > > > > > > > Xia > > > > > > > > > > > > > > Ron Liu <[email protected]> 于2024年7月24日周三 14:10写道: > > > > > > > > > > > > > > > Hi, Xia > > > > > > > > > > > > > > > > This FLIP looks good to me, +1. > > > > > > > > > > > > > > > > I've two questions: > > > > > > > > > > > > > > > > 1. > > > > > > > > >> Accordingly, in terms of implementation, we will delay the > > > > codegen > > > > > > and > > > > > > > > creation of the join operators until runtime. > > > > > > > > > > > > > > > > How are you delaying codegen to runtime, the current runtime > is > > > not > > > > > SQL > > > > > > > > planner aware. in other words, how do I understand this > > sentence? > > > > > > > > > > > > > > > > 2. FLIP-469 mentions passing StreamGraphOptimizationStrategy > to > > > > > runtime > > > > > > > via > > > > > > > > option > > > > > `execution.batch.adaptive.stream-graph-optimization.strategies`. > > > > > > > In > > > > > > > > SQL planner if you have multiple different optimization > > > strategies > > > > > like > > > > > > > > broadcast join, skew join, etc... When did you configure > these > > > > > > > > optimization strategies uniformly into > > > > > > > > > > `execution.batch.adaptive.stream-graph-optimization.strategies`? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Zhu Zhu <[email protected]> 于2024年7月19日周五 17:41写道: > > > > > > > > > > > > > > > > > +1 for the FLIP > > > > > > > > > > > > > > > > > > It's a good start to adaptively optimize the logical > > execution > > > > plan > > > > > > > with > > > > > > > > > runtime information. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Zhu > > > > > > > > > > > > > > > > > > Xia Sun <[email protected]> 于2024年7月18日周四 18:23写道: > > > > > > > > > > > > > > > > > > > Hi devs, > > > > > > > > > > > > > > > > > > > > Junrui Lee, Lei Yang, and I would like to initiate a > > > discussion > > > > > > about > > > > > > > > > > FLIP-470: Support Adaptive Broadcast Join[1]. > > > > > > > > > > > > > > > > > > > > In general, Broadcast Hash Join is currently the most > > > efficient > > > > > > join > > > > > > > > > > strategy available in Flink. However, its prerequisite is > > > that > > > > > the > > > > > > > > input > > > > > > > > > > data on one side must be sufficiently small; otherwise, > it > > > may > > > > > lead > > > > > > > to > > > > > > > > > > memory overuse or other issues. Currently, due to the > lack > > of > > > > > > precise > > > > > > > > > > statistics, it is difficult to make accurate estimations > > > during > > > > > the > > > > > > > > Flink > > > > > > > > > > SQL Planning phase. For example, when an upstream Filter > > > > operator > > > > > > is > > > > > > > > > > present, it is easy to overestimate the size of the > table, > > > > > whereas > > > > > > > with > > > > > > > > > > an expansion operator, the table size tends to be > > > > underestimated. > > > > > > > > > Moreover, > > > > > > > > > > once the join operator is determined, it cannot be > modified > > > at > > > > > > > runtime. > > > > > > > > > > > > > > > > > > > > To address this issue, we plan to introduce Adaptive > > > Broadcast > > > > > Join > > > > > > > > > > capability based on FLIP-468: Introducing > StreamGraph-Based > > > Job > > > > > > > > > > Submission[2] > > > > > > > > > > and FLIP-469: Supports Adaptive Optimization of > > > StreamGraph[3]. > > > > > > This > > > > > > > > will > > > > > > > > > > allow the join operator to be dynamically optimized to > > > > Broadcast > > > > > > Join > > > > > > > > > based > > > > > > > > > > on the actual input data volume at runtime and fallback > > when > > > > the > > > > > > > > > > optimization > > > > > > > > > > conditions are not met. > > > > > > > > > > > > > > > > > > > > For more details, please refer to FLIP-470[1]. We look > > > forward > > > > to > > > > > > > your > > > > > > > > > > feedback. > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Junrui Lee, Lei Yang and Xia Sun > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join > > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission > > > > > > > > > > [3] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
