Hi, Jing, thanks for driving the discussion. Have you made some investigation on the syntax of join hint? Why do you choose USE_HASH from oracle instead of the style of spark SHUFFLE_HASH, they are quite different. People in the big data world may be more familiar with spark/hive, if we need to choose one, personally, I prefer the style of spark.
Best, Wenlong On Wed, 29 Dec 2021 at 16:48, zst...@163.com <zst...@163.com> wrote: > > > > Hi Jing, > Thanks for your detail reply. > 1) In the last suggestion, hash by primary key is not use for raising the > cache hit, but handling with skew of left source. Now that you have 'skew' > hint and other discussion about it, I'm looking forward to it. > 2) I mean to support user defined partitioner function. We have a case > that joining a datalake source with special way of partition, and have > implemented not elegantly in our internal version. As you said, it needs > more design. > 3) I thing so-called 'HashPartitionedCache' is usefull, otherwise loading > all data such as hive lookup table source is almost not available in big > data. > > > > > > > > Best regards, > Yuan > > > > > > > > > 在 2021-12-29 14:52:11,"Jing Zhang" <beyond1...@gmail.com> 写道: > >Hi, Lincoln > >Thanks a lot for the feedback. > > > >> Regarding the hint name ‘USE_HASH’, could we consider more candidates? > >Things are a little different from RDBMS in the distributed world, and we > >also aim to solve the data skew problem, so all these incoming hints names > >should be considered together. > > > >About skew problem, I would discuss this in next FLIP individually. I > would > >like to share hint proposal for skew here. > >I want to introduce 'skew' hint which is a query hint, similar with skew > >hint in spark [1] and MaxCompute[2]. > >The 'skew' hint could only contain the name of the table with skew. > >Besides, skew hint could accept table name and column names. > >In addition, skew hint could accept table name, column names and skew > >values. > >For example: > > > >SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ o.order_id, > >o.total, c.country, c.zip > >FROM Orders AS o > >JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c > >ON o.customer_id = c.id; > > > >The 'skew' hint is not only used for look up join here, but also could be > >used for other types of join later, for example, batch hash join or > >streaming regular join. > >Go back to better name problem for hash look up join. Since the 'skew' > hint > >is a separate hint, so 'use_hash' is still an alternative. > >WDYT? > >I don't have a good idea about the better hint name yet. I would like to > >heard more suggestions about hint names. > > > >> As you mentioned in the flip, this solution depends on future changes > to > >calcite (and also upgrading calcite would be another possible big change: > >at least calicite-1.30 vs 1.26, are we preparing to accept this big > >change?). > > > >Indeed, solution 1 depends on calcite upgrade. > >I admit upgrade from Calcite 1.26 to 1.30 would be a big change. I still > >remember what we have suffered from last upgrade to Calcite 1.26. > >However we could not always avoid upgrade for the following reason: > >1. Other features also depends on the Calcite upgrade. For example, > Session > >Window and Count Window. > >2. If we always avoid Calcite upgrade, there would be more gap with the > >latest version. One day, if upgrading becomes a thing which has to be > done, > >the pain is more. > > > >WDYT? > > > >> Is there another possible way to minimize the change in calcite? > > > >Do you check the 'Other Alternatives' part in the FLIP-204? It gives > >another solution which does not depend on calcite upgrade and do not need > >to worry about the hint would be missed in the propagation. > >This is also what we have done in the internal version. > >The core idea is propagating 'use_hash' hint to TableScan with matched > >table names. However, it is a little hacky. > > > >> As I know there're more limitations than `Correlate`. > > > >As mentioned before, in our external version, I choose the the 'Other > >Alternatives' part in the FLIP-204. > >Although I do a POC in the solution 1 and lists all changes I found in the > >FLIP, there may still be something I missed. > >I'm very happy to hear that you point out there're more limitations except > >for `Correlate`, would you please give more details on this part? > > > >Best, > >Jing Zhang > > > >[1] https://docs.databricks.com/delta/join-performance/skew-join.html > >[2] > > > https://help.aliyun.com/apsara/enterprise/v_3_13_0_20201215/odps/enterprise-ascm-user-guide/hotspot-tilt.html?spm=a2c4g.14484438.10001.669 > > > >Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 14:40写道: > > > >> Hi Yuan and Lincoln, > >> thanks a lot for the attention. I would answer the email one by one. > >> > >> To Yuan > >> > How shall we deal with CDC data? If there is CDC data in the pipeline, > >> IMHO, shuffle by join key will cause CDC data disorder. Will it be > better > >> to use primary key in this case? > >> > >> Good question. > >> The problem could not only exists in CDC data source, but also exists > when > >> the input stream is not insert-only stream (for example, the result of > >> unbounded aggregate or regular join). > >> I think use hash by primary key is not a good choise. It could not raise > >> the cache hit because cache key is look up key instead of primary key of > >> input. > >> > >> To avoid wrong result, hash lookup Join requires that the input stream > >> should be insert_only stream or its upsert keys contains lookup keys. > >> > >> I've added this limitation to FLIP, thanks a lot for reminding. > >> > >> > If the shuffle keys can be customized when users have the knowledge > >> about distribution of data? > >> > >> I'm not sure I understand your question. > >> > >> Do you mean to support user defined partitioner function on keys just > like > >> flink DataStream sql? > >> If yes, I'm afraid there is no plan to support this feature yet because > >> the feature involves many things, for example: > >> 1. sql syntax > >> 2. user defined partitioner API > >> 3. RelDistribution type extension and Flink RelDistribution extension > >> 4. FlinkExpandConversionRule > >> 5. Exchange execNode extension > >> 6. .... > >> It needs well designed and more discussion. If this is a strong > >> requirement, we would drive another discussion on this point > individually. > >> In this FLIP, I would first support hash shuffle. WDYT? > >> > >> Or do you mean support hash by other keys instead of lookup key? > >> If yes, would you please tell me a specific user case? > >> We need to fetch the record from external storage of dimension table by > >> look up key, so those dimension table source uses look up keys as cache > >> key. > >> We could only increase the cache ratio by shuffle lookup keys. > >> I need more use cases to understand this requirement. > >> > >> > Some connectors such as hive, caches all data in LookupFunction. How > to > >> decrease the valid cache data size if data can be shuffled? > >> > >> Very good idea. > >> There are two types of cache. > >> For Key-Value storage, such as Redis/HBase, the lookup table source > stores > >> the visited lookup keys and it's record into cache lazily. > >> For other storage without keys, such as hive, each task loads all data > >> into cache eagerly in the initialize phase. > >> After introduce hash partitioner, for key-value storages, there is no > need > >> to change; for hive, each task could only load part of cache instead of > >> load all cache. > >> > >> We have implemented this optimization in our internal version. > >> The core idea is push the partitioner information down to the lookup > table > >> source. When loading data into caches, each task could only store those > >> records which look keys are sent to current task. > >> We called this 'HashPartitionedCache'. > >> > >> I have added this point into the Lookup Join requirements list in the > >> motivation of the FLIP, but I would not do this point in this FLIP right > >> now. > >> If this is a strong requirement, we need drive another discussion on > this > >> topic individually because this point involves many extension on API. > >> > >> Best, > >> Jing Zhang > >> > >> > >> Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月29日周三 10:01写道: > >> > >>> Hi Jing, > >>> Thanks for bringing up this discussion! Agree that this join hints > >>> should benefit both bounded and unbounded cases as Martin mentioned. > >>> I also agree that implementing the query hint is the right way for a > more > >>> general purpose since the dynamic table options has a limited scope. > >>> Some points I'd like to share are: > >>> 1. Regarding the hint name ‘USE_HASH’, could we consider more > candidates? > >>> Things are a little different from RDBMS in the distributed world, and > we > >>> also aim to solve the data skew problem, so all these incoming hints > names > >>> should be considered together. > >>> 2. As you mentioned in the flip, this solution depends on future > changes > >>> to > >>> calcite (and also upgrading calcite would be another possible big > change: > >>> at least calicite-1.30 vs 1.26, are we preparing to accept this big > >>> change?). Is there another possible way to minimize the change in > calcite? > >>> As I know there're more limitations than `Correlate`. > >>> > >>> Best, > >>> Lincoln Lee > >>> > >>> > >>> Jing Zhang <beyond1...@gmail.com> 于2021年12月28日周二 23:04写道: > >>> > >>> > Hi Martijn, > >>> > Thanks a lot for your attention. > >>> > I'm sorry I didn't explain the motivation clearly. I would like to > >>> explain > >>> > it in detail, and then give response on your questions. > >>> > A lookup join is typically used to enrich a table with data that is > >>> queried > >>> > from an external system. Many Lookup table sources introduce cache in > >>> order > >>> > to reduce the RPC call, such as JDBC, CSV, HBase connectors. > >>> > For those connectors, we could raise cache hit ratio by routing the > same > >>> > lookup keys to the same task instance. This is the purpose of > >>> > > >>> > > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join > >>> > . > >>> > Other cases might benefit from Hash distribution, such as batch hash > >>> join > >>> > as you mentioned. It is a cool idea, however it is not the purpose of > >>> this > >>> > FLIP, we could discuss this in FLINK-20670 > >>> > <https://issues.apache.org/jira/browse/FLINK-20670>. > >>> > > >>> > > - When I was reading about this topic [1] I was wondering if this > >>> feature > >>> > would be more beneficial for bounded use cases and not so much for > >>> > unbounded use cases. What do you think? > >>> > > >>> > As mentioned before, the purpose of Hash Lookup Join is to increase > the > >>> > cache hit ratio which is different from Oracle Hash Join. However we > >>> could > >>> > use the similar hint syntax. > >>> > > >>> > > - If I look at the current documentation for SQL Hints in Flink > [2], I > >>> > notice that all of the hints there are located at the end of the SQL > >>> > statement. In the FLIP, the use_hash is defined directly after the > >>> 'SELECT' > >>> > keyword. Can we somehow make this consistent for the user? Or should > the > >>> > user be able to specify hints anywhere in its SQL statement? > >>> > > >>> > Calcite supports hints in two locations [3]: > >>> > Query Hint: right after the SELECT keyword; > >>> > Table Hint: right after the referenced table name. > >>> > Now Flink has supported dynamic table options based on the Hint > >>> framework > >>> > of Calcite which is mentioned in doc[2]. > >>> > Besides, query hints are also important, it could give a hint for > >>> > optimizers to choose a better plan. Almost all popular databases and > >>> > big-data engines support sql query hints, such as oracle, hive, spark > >>> and > >>> > so on. > >>> > I think using query hints in this case is more natural for users, > WDYT? > >>> > > >>> > I have updated the motivation part in the FLIP, > >>> > Thanks for the feedback! > >>> > > >>> > [1] https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI > >>> > [2] > >>> > > >>> > > >>> > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/ > >>> > [3] https://calcite.apache.org/docs/reference.html#sql-hints > >>> > > >>> > Best, > >>> > Jing Zhang > >>> > > >>> > Martijn Visser <mart...@ververica.com> 于2021年12月28日周二 22:02写道: > >>> > > >>> > > Hi Jing, > >>> > > > >>> > > Thanks a lot for the explanation and the FLIP. I definitely learned > >>> > > something when reading more about `use_hash`. My interpretation > would > >>> be > >>> > > that the primary benefit of a hash lookup join would be improved > >>> > > performance by allowing the user to explicitly optimise the > planner. > >>> > > > >>> > > I have a couple of questions: > >>> > > > >>> > > - When I was reading about this topic [1] I was wondering if this > >>> feature > >>> > > would be more beneficial for bounded use cases and not so much for > >>> > > unbounded use cases. What do you think? > >>> > > - If I look at the current documentation for SQL Hints in Flink > [2], I > >>> > > notice that all of the hints there are located at the end of the > SQL > >>> > > statement. In the FLIP, the use_hash is defined directly after the > >>> > 'SELECT' > >>> > > keyword. Can we somehow make this consistent for the user? Or > should > >>> the > >>> > > user be able to specify hints anywhere in its SQL statement? > >>> > > > >>> > > Best regards, > >>> > > > >>> > > Martijn > >>> > > > >>> > > [1] > https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI > >>> > > [2] > >>> > > > >>> > > > >>> > > >>> > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/ > >>> > > > >>> > > > >>> > > On Tue, 28 Dec 2021 at 08:17, Jing Zhang <beyond1...@gmail.com> > >>> wrote: > >>> > > > >>> > > > Hi everyone, > >>> > > > Look up join > >>> > > > < > >>> > > > > >>> > > > >>> > > >>> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > >>> > > > >[1] > >>> > > > is > >>> > > > commonly used feature in Flink SQL. We have received many > >>> optimization > >>> > > > requirements on look up join. For example: > >>> > > > 1. Enforces left side of lookup join do a hash partitioner to > raise > >>> > cache > >>> > > > hint ratio > >>> > > > 2. Solves the data skew problem after introduces hash lookup join > >>> > > > 3. Enables mini-batch optimization to reduce RPC call > >>> > > > > >>> > > > Next we will solve these problems one by one. Firstly, we would > >>> focus > >>> > on > >>> > > > point 1, and continue to discuss point 2 and point 3 later. > >>> > > > > >>> > > > There are many similar requirements from user mail list and JIRA > >>> about > >>> > > hash > >>> > > > Lookup Join, for example: > >>> > > > 1. FLINK-23687 < > https://issues.apache.org/jira/browse/FLINK-23687> > >>> - > >>> > > > Introduce partitioned lookup join to enforce input of LookupJoin > to > >>> > hash > >>> > > > shuffle by lookup keys > >>> > > > 2. FLINK-25396 < > https://issues.apache.org/jira/browse/FLINK-25396> > >>> - > >>> > > > lookupjoin source table for pre-partitioning > >>> > > > 3. FLINK-25262 < > https://issues.apache.org/jira/browse/FLINK-25262> > >>> - > >>> > > > Support to send data to lookup table for > KeyGroupStreamPartitioner > >>> way > >>> > > for > >>> > > > SQL. > >>> > > > > >>> > > > In this FLIP, I would like to start a discussion about Hash > Lookup > >>> > Join. > >>> > > > The core idea is introducing a 'USE_HASH' hint in query. This > >>> syntax > >>> > is > >>> > > > directly user-oriented and therefore requires careful design. > >>> > > > There are two ways about how to propagate this hint to > LookupJoin in > >>> > > > optimizer. We need further discussion to do final decide. Anyway, > >>> the > >>> > > > difference between the two solution is only about the internal > >>> > > > implementation and has no impact on the user. > >>> > > > > >>> > > > For more detail on the proposal: > >>> > > > > >>> > > > > >>> > > > >>> > > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join > >>> > > > > >>> > > > > >>> > > > Looking forward to your feedback, thanks. > >>> > > > > >>> > > > Best, > >>> > > > Jing Zhang > >>> > > > > >>> > > > [1] > >>> > > > > >>> > > > > >>> > > > >>> > > >>> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > >>> > > > > >>> > > > >>> > > >>> > >> >