Hi, Jing, thanks for driving the discussion.

Have you made some investigation on the syntax of join hint?
Why do you choose USE_HASH from oracle instead of the style of spark
SHUFFLE_HASH, they are quite different.
People in the big data world may be more familiar with spark/hive, if we
need to choose one, personally, I prefer the style of spark.


Best,
Wenlong

On Wed, 29 Dec 2021 at 16:48, zst...@163.com <zst...@163.com> wrote:

>
>
>
> Hi Jing,
> Thanks for your detail reply.
> 1) In the last suggestion, hash by primary key is not use for raising the
> cache hit, but handling with skew of left source. Now that you have 'skew'
> hint and other discussion about it, I'm looking forward to it.
> 2) I mean to support user defined partitioner function. We have a case
> that joining a datalake source with special way of partition, and have
> implemented not elegantly in our internal version. As you said, it needs
> more design.
> 3) I thing so-called 'HashPartitionedCache' is usefull, otherwise loading
> all data such as hive lookup table source is almost not available in big
> data.
>
>
>
>
>
>
>
> Best regards,
> Yuan
>
>
>
>
>
>
>
>
> 在 2021-12-29 14:52:11,"Jing Zhang" <beyond1...@gmail.com> 写道:
> >Hi, Lincoln
> >Thanks a lot for the feedback.
> >
> >>  Regarding the hint name ‘USE_HASH’, could we consider more candidates?
> >Things are a little different from RDBMS in the distributed world, and we
> >also aim to solve the data skew problem, so all these incoming hints names
> >should be considered together.
> >
> >About skew problem, I would discuss this in next FLIP individually. I
> would
> >like to share hint proposal for skew here.
> >I want to introduce 'skew' hint which is a query hint, similar with skew
> >hint in spark [1] and MaxCompute[2].
> >The 'skew' hint could only contain the name of the table with skew.
> >Besides, skew hint could accept table name and column names.
> >In addition, skew hint could accept table name, column names and skew
> >values.
> >For example:
> >
> >SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ o.order_id,
> >o.total, c.country, c.zip
> >FROM Orders AS o
> >JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> >ON o.customer_id = c.id;
> >
> >The 'skew' hint is not only used for look up join here, but also could be
> >used for other types of join later, for example, batch hash join or
> >streaming regular join.
> >Go back to better name problem for hash look up join. Since the 'skew'
> hint
> >is a separate hint, so 'use_hash' is still an alternative.
> >WDYT?
> >I don't have a good idea about the better hint name yet. I would like to
> >heard more suggestions about hint names.
> >
> >>  As you mentioned in the flip, this solution depends on future changes
> to
> >calcite (and also upgrading calcite would be another possible big change:
> >at least calicite-1.30 vs 1.26, are we preparing to accept this big
> >change?).
> >
> >Indeed, solution 1 depends on calcite upgrade.
> >I admit upgrade from Calcite 1.26 to 1.30 would be a big change. I still
> >remember what we have suffered from last upgrade to Calcite 1.26.
> >However we could not always avoid upgrade for the following reason:
> >1. Other features also depends on the Calcite upgrade. For example,
> Session
> >Window and Count Window.
> >2. If we always avoid Calcite upgrade, there would be more gap with the
> >latest version. One day, if upgrading becomes a thing which has to be
> done,
> >the pain is more.
> >
> >WDYT?
> >
> >>  Is there another possible way to minimize the change in calcite?
> >
> >Do you check the 'Other Alternatives' part in the FLIP-204? It gives
> >another solution which does not depend on calcite upgrade and do not need
> >to worry about the hint would be missed in the propagation.
> >This is also what we have done in the internal version.
> >The core idea is propagating 'use_hash' hint to TableScan with matched
> >table names.  However, it is a little hacky.
> >
> >> As I know there're more limitations than `Correlate`.
> >
> >As mentioned before, in our external version, I choose the the 'Other
> >Alternatives' part in the FLIP-204.
> >Although I do a POC in the solution 1 and lists all changes I found in the
> >FLIP, there may still be something I missed.
> >I'm very happy to hear that you point out there're more limitations except
> >for `Correlate`, would you please give more details on this part?
> >
> >Best,
> >Jing Zhang
> >
> >[1] https://docs.databricks.com/delta/join-performance/skew-join.html
> >[2]
> >
> https://help.aliyun.com/apsara/enterprise/v_3_13_0_20201215/odps/enterprise-ascm-user-guide/hotspot-tilt.html?spm=a2c4g.14484438.10001.669
> >
> >Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 14:40写道:
> >
> >> Hi Yuan and Lincoln,
> >> thanks a lot for the attention. I would answer the email one by one.
> >>
> >> To Yuan
> >> > How shall we deal with CDC data? If there is CDC data in the pipeline,
> >> IMHO, shuffle by join key will cause CDC data disorder. Will it be
> better
> >> to use primary key in this case?
> >>
> >> Good question.
> >> The problem could not only exists in CDC data source, but also exists
> when
> >> the input stream is not insert-only stream (for example, the result of
> >> unbounded aggregate or regular join).
> >> I think use hash by primary key is not a good choise. It could not raise
> >> the cache hit because cache key is look up key instead of primary key of
> >> input.
> >>
> >> To avoid wrong result, hash lookup Join requires that the input stream
> >> should be insert_only stream or its upsert keys contains lookup keys.
> >>
> >> I've added this limitation to FLIP, thanks a lot for reminding.
> >>
> >> > If the shuffle keys can be customized  when users have the knowledge
> >> about distribution of data?
> >>
> >> I'm not sure I understand your question.
> >>
> >> Do you mean to support user defined partitioner function on keys just
> like
> >> flink DataStream sql?
> >> If yes, I'm afraid there is no plan to support this feature yet because
> >> the feature involves many things, for example:
> >> 1. sql syntax
> >> 2. user defined partitioner API
> >> 3. RelDistribution type extension and Flink RelDistribution extension
> >> 4. FlinkExpandConversionRule
> >> 5. Exchange execNode extension
> >> 6. ....
> >> It needs well designed and more discussion. If this is a strong
> >> requirement, we would drive another discussion on this point
> individually.
> >> In this FLIP, I would first support hash shuffle. WDYT?
> >>
> >> Or do you mean support hash by other keys instead of lookup key?
> >> If yes, would you please tell me a specific user case?
> >> We need to fetch the record from external storage of dimension table by
> >> look up key, so those dimension table source uses look up keys as cache
> >> key.
> >> We could only increase  the cache ratio by shuffle lookup keys.
> >> I need more use cases to understand this requirement.
> >>
> >> > Some connectors such as hive, caches all data in LookupFunction. How
> to
> >> decrease the valid cache data size if data can be shuffled?
> >>
> >> Very good idea.
> >> There are two types of cache.
> >> For Key-Value storage, such as Redis/HBase, the lookup table source
> stores
> >> the visited lookup keys and it's record into cache lazily.
> >> For other storage without keys, such as hive, each task loads all data
> >> into cache eagerly in the initialize phase.
> >> After introduce hash partitioner, for key-value storages, there is no
> need
> >> to change; for hive, each task could only load part of cache instead of
> >> load all cache.
> >>
> >> We have implemented this optimization in our internal version.
> >> The core idea is push the partitioner information down to the lookup
> table
> >> source. When loading data into caches, each task could only store those
> >> records which look keys are sent to current task.
> >> We called this 'HashPartitionedCache'.
> >>
> >> I have added this point into the Lookup Join requirements list in the
> >> motivation of the FLIP, but I would not do this point in this FLIP right
> >> now.
> >> If this is a strong requirement, we need drive another discussion on
> this
> >> topic individually because this point involves many extension on API.
> >>
> >> Best,
> >> Jing Zhang
> >>
> >>
> >> Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月29日周三 10:01写道:
> >>
> >>> Hi Jing,
> >>>     Thanks for bringing up this discussion!  Agree that this join hints
> >>> should benefit both bounded and unbounded cases as Martin mentioned.
> >>> I also agree that implementing the query hint is the right way for a
> more
> >>> general purpose since the dynamic table options has a limited scope.
> >>>    Some points I'd like to share are:
> >>> 1. Regarding the hint name ‘USE_HASH’, could we consider more
> candidates?
> >>> Things are a little different from RDBMS in the distributed world, and
> we
> >>> also aim to solve the data skew problem, so all these incoming hints
> names
> >>> should be considered together.
> >>> 2. As you mentioned in the flip, this solution depends on future
> changes
> >>> to
> >>> calcite (and also upgrading calcite would be another possible big
> change:
> >>> at least calicite-1.30 vs 1.26, are we preparing to accept this big
> >>> change?). Is there another possible way to minimize the change in
> calcite?
> >>> As I know there're more limitations than `Correlate`.
> >>>
> >>> Best,
> >>> Lincoln Lee
> >>>
> >>>
> >>> Jing Zhang <beyond1...@gmail.com> 于2021年12月28日周二 23:04写道:
> >>>
> >>> > Hi Martijn,
> >>> > Thanks a lot for your attention.
> >>> > I'm sorry I didn't explain the motivation clearly. I would like to
> >>> explain
> >>> > it in detail, and then give response on your questions.
> >>> > A lookup join is typically used to enrich a table with data that is
> >>> queried
> >>> > from an external system. Many Lookup table sources introduce cache in
> >>> order
> >>> > to reduce the RPC call, such as JDBC, CSV, HBase connectors.
> >>> > For those connectors, we could raise cache hit ratio by routing the
> same
> >>> > lookup keys to the same task instance. This is the purpose of
> >>> >
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> >>> > .
> >>> > Other cases might benefit from Hash distribution, such as batch hash
> >>> join
> >>> > as you mentioned. It is a cool idea, however it is not the purpose of
> >>> this
> >>> > FLIP, we could discuss this in FLINK-20670
> >>> > <https://issues.apache.org/jira/browse/FLINK-20670>.
> >>> >
> >>> > > - When I was reading about this topic [1] I was wondering if this
> >>> feature
> >>> > would be more beneficial for bounded use cases and not so much for
> >>> > unbounded use cases. What do you think?
> >>> >
> >>> > As mentioned before, the purpose of Hash Lookup Join is to increase
> the
> >>> > cache hit ratio which is different from Oracle Hash Join. However we
> >>> could
> >>> > use the similar hint syntax.
> >>> >
> >>> > > - If I look at the current documentation for SQL Hints in Flink
> [2], I
> >>> > notice that all of the hints there are located at the end of the SQL
> >>> > statement. In the FLIP, the use_hash is defined directly after the
> >>> 'SELECT'
> >>> > keyword. Can we somehow make this consistent for the user? Or should
> the
> >>> > user be able to specify hints anywhere in its SQL statement?
> >>> >
> >>> > Calcite supports hints in two locations [3]:
> >>> > Query Hint: right after the SELECT keyword;
> >>> > Table Hint: right after the referenced table name.
> >>> > Now Flink has supported dynamic table options based on the Hint
> >>> framework
> >>> > of Calcite which is mentioned in doc[2].
> >>> > Besides, query hints are also important, it could give a hint for
> >>> > optimizers to choose a better plan. Almost all popular databases and
> >>> > big-data engines support sql query hints, such as oracle, hive, spark
> >>> and
> >>> > so on.
> >>> > I think using query hints in this case is more natural for users,
> WDYT?
> >>> >
> >>> > I have updated the motivation part in the FLIP,
> >>> > Thanks for the feedback!
> >>> >
> >>> > [1] https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
> >>> > [2]
> >>> >
> >>> >
> >>>
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
> >>> > [3] https://calcite.apache.org/docs/reference.html#sql-hints
> >>> >
> >>> > Best,
> >>> > Jing Zhang
> >>> >
> >>> > Martijn Visser <mart...@ververica.com> 于2021年12月28日周二 22:02写道:
> >>> >
> >>> > > Hi Jing,
> >>> > >
> >>> > > Thanks a lot for the explanation and the FLIP. I definitely learned
> >>> > > something when reading more about `use_hash`. My interpretation
> would
> >>> be
> >>> > > that the primary benefit of a hash lookup join would be improved
> >>> > > performance by allowing the user to explicitly optimise the
> planner.
> >>> > >
> >>> > > I have a couple of questions:
> >>> > >
> >>> > > - When I was reading about this topic [1] I was wondering if this
> >>> feature
> >>> > > would be more beneficial for bounded use cases and not so much for
> >>> > > unbounded use cases. What do you think?
> >>> > > - If I look at the current documentation for SQL Hints in Flink
> [2], I
> >>> > > notice that all of the hints there are located at the end of the
> SQL
> >>> > > statement. In the FLIP, the use_hash is defined directly after the
> >>> > 'SELECT'
> >>> > > keyword. Can we somehow make this consistent for the user? Or
> should
> >>> the
> >>> > > user be able to specify hints anywhere in its SQL statement?
> >>> > >
> >>> > > Best regards,
> >>> > >
> >>> > > Martijn
> >>> > >
> >>> > > [1]
> https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
> >>> > > [2]
> >>> > >
> >>> > >
> >>> >
> >>>
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
> >>> > >
> >>> > >
> >>> > > On Tue, 28 Dec 2021 at 08:17, Jing Zhang <beyond1...@gmail.com>
> >>> wrote:
> >>> > >
> >>> > > > Hi everyone,
> >>> > > > Look up join
> >>> > > > <
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> >>> > > > >[1]
> >>> > > > is
> >>> > > > commonly used feature in Flink SQL. We have received many
> >>> optimization
> >>> > > > requirements on look up join. For example:
> >>> > > > 1. Enforces left side of lookup join do a hash partitioner to
> raise
> >>> > cache
> >>> > > > hint ratio
> >>> > > > 2. Solves the data skew problem after introduces hash lookup join
> >>> > > > 3. Enables mini-batch optimization to reduce RPC call
> >>> > > >
> >>> > > > Next we will solve these problems one by one. Firstly,  we would
> >>> focus
> >>> > on
> >>> > > > point 1, and continue to discuss point 2 and point 3 later.
> >>> > > >
> >>> > > > There are many similar requirements from user mail list and JIRA
> >>> about
> >>> > > hash
> >>> > > > Lookup Join, for example:
> >>> > > > 1. FLINK-23687 <
> https://issues.apache.org/jira/browse/FLINK-23687>
> >>> -
> >>> > > > Introduce partitioned lookup join to enforce input of LookupJoin
> to
> >>> > hash
> >>> > > > shuffle by lookup keys
> >>> > > > 2. FLINK-25396 <
> https://issues.apache.org/jira/browse/FLINK-25396>
> >>> -
> >>> > > > lookupjoin source table for pre-partitioning
> >>> > > > 3. FLINK-25262 <
> https://issues.apache.org/jira/browse/FLINK-25262>
> >>> -
> >>> > > > Support to send data to lookup table for
> KeyGroupStreamPartitioner
> >>> way
> >>> > > for
> >>> > > > SQL.
> >>> > > >
> >>> > > > In this FLIP, I would like to start a discussion about Hash
> Lookup
> >>> > Join.
> >>> > > > The core idea is introducing a 'USE_HASH' hint in query.  This
> >>> syntax
> >>> > is
> >>> > > > directly user-oriented and therefore requires careful design.
> >>> > > > There are two ways about how to propagate this hint to
> LookupJoin in
> >>> > > > optimizer. We need further discussion to do final decide. Anyway,
> >>> the
> >>> > > > difference between the two solution is only about the internal
> >>> > > > implementation and has no impact on the user.
> >>> > > >
> >>> > > > For more detail on the proposal:
> >>> > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> >>> > > >
> >>> > > >
> >>> > > > Looking forward to your feedback, thanks.
> >>> > > >
> >>> > > > Best,
> >>> > > > Jing Zhang
> >>> > > >
> >>> > > > [1]
> >>> > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
>

Reply via email to