Hi Francesco,
Thanks a lot for the feedback.

> does it makes sense for a lookup join to use hash distribution whenever
is possible by default?
I prefer to enable the hash lookup join only find the hint in the query for
the following reason:
1. Plan compatibility
    Add a hash shuffle by default would leads to the change of plan after
users upgrade the flink version.
    Besides, lookup join is commonly used feature in flink SQL.
2. Not all flink jobs could benefit from this improvement.
    It is a trade off for the lookup join with dimension connectors which
has cache inside.
    We hope the raise the cache hit ratio by Hash Lookup Join, however it
would leads to an extra shuffle at the same time.
    It is not always a positive optimization, especially for the connectors
which does not have cache inside.

> Shouldn't the hint take the table alias as the "table name"?  What if you
do two lookup joins in cascade within the same query with the same table
(once
on a key, then on another one), where you use two different aliases for the
table?
In theory, it's better to support both table names and alias names.
But in calcite, the alias name of subquery or table would not be lost in
the sql conversion phase and sql optimization phase.
So here we only support table names.

Best,
Jing Zhang


Francesco Guardiani <france...@ververica.com> 于2022年1月3日周一 18:38写道:

> Hi Jing,
>
> Thanks for the FLIP. I'm not very knowledgeable about the topic, but going
> through both the FLIP and the discussion here, I wonder, does it makes
> sense for a lookup join to use hash distribution whenever is possible by
> default?
>
> The point you're explaining here:
>
> > Many Lookup table sources introduce cache in order
> to reduce the RPC call, such as JDBC, CSV, HBase connectors.
> For those connectors, we could raise cache hit ratio by routing the same
> lookup keys to the same task instance
>
> Seems something we can infer automatically, rather than manually asking the
> user to add this hint to the query. Note that I'm not talking against the
> hint syntax, which might still make sense to be introduced, but I feel like
> this optimization makes sense in the general case when using the connectors
> you have quoted. Perhaps there is some downside I'm not aware of?
>
> Talking about the hint themselves, taking this example as reference:
>
> SELECT /*+ SHUFFLE_HASH('Orders', 'Customers') */ o.order_id, o.total,
> c.country, c.zip
> FROM Orders AS o
> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> ON o.customer_id = c.id;
>
> Shouldn't the hint take the table alias as the "table name"? What If you do
> two lookup joins in cascade within the same query with the same table (once
> on a key, then on another one), where you use two different aliases for the
> table?
>
>
> On Fri, Dec 31, 2021 at 9:56 AM Jing Zhang <beyond1...@gmail.com> wrote:
>
> > Hi Lincoln,
> > Thanks for the feedback.
> >
> > > 1. For the hint name, +1 for WenLong's proposal.
> >
> > I've added add 'SHUFFLE_HASH' to other alternatives in FLIP. Let's
> waiting
> > for more voices here.
> >
> > > Regarding the `SKEW` hint, agree with you that it can be used widely,
> and
> > I
> > prefer to treat it as a metadata hint, a new category differs from a join
> > hint.
> > For your example:
> > ```
> > SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ o.order_id,
> > o.total, c.country, c.zip
> > FROM Orders AS o
> > JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> > ON o.customer_id = c.id;
> > ```
> > I would prefer another form:
> > ```
> > -- provide the skew info to let the engine choose the optimal plan
> > SELECT /*+ SKEW('Orders') */ o.order_id, ...
> >
> > -- or introduce a new hint for the join case, e.g.,
> > SELECT /*+ REPLICATED_SHUFFLE_HASH('Orders') */ o.order_id, ...
> > ```
> >
> > Maybe there is misunderstanding here.
> > I just use a syntax sugar here.
> >
> > SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */ o.order_id,
> > ....
> >
> > is just a syntax with
> >
> > SELECT /*+ USE_HASH('Orders', 'Customers') */ /*+SKEW('Orders') */
> > o.order_id,
> > ....
> >
> > Although I list 'USE_HASH' and 'SKEW' hint in a query hint clause, it
> does
> > not mean they must appear together as a whole.
> > Based on calcite syntax doc [1], you could list more than one hint in
> > a /*+' hint [, hint ]* '*/ clause.
> >
> > Each hint has different function.
> > The'USE_HASH' hint suggests the optimizer use hash partitioner for Lookup
> > Join for table 'Orders' and table 'Customers' while the 'SKEW' hint tells
> > the optimizer the skew metadata about the table 'Orders'.
> >
> > Best,
> > Jing Zhang
> >
> > [1] https://calcite.apache.org/docs/reference.html#sql-hints
> >
> >
> >
> >
> > Jing Zhang <beyond1...@gmail.com> 于2021年12月31日周五 16:39写道:
> >
> > > Hi Martijn,
> > > Thanks for the feedback.
> > >
> > > Glad to hear that we reached a consensus on the first and second point.
> > >
> > > About whether to use `use_hash` as a term, I think your concern makes
> > > sense.
> > > Although the hash lookup join is similar to Hash join in oracle that
> they
> > > all require hash distribution on input, there exists a little
> difference
> > > between them.
> > > About this point, Lincoln and WenLong both prefer the term
> > 'SHUFFLE_HASH',
> > > WDYT?
> > >
> > > Best,
> > > Jing Zhang
> > >
> > >
> > > Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月30日周四 11:21写道:
> > >
> > >> Hi Jing,
> > >>     Thanks for your explanation!
> > >>
> > >> 1. For the hint name, +1 for WenLong's proposal. I think the `SHUFFLE`
> > >> keyword is important in a classic distributed computing system,
> > >> a hash-join usually means there's a shuffle stage(include shuffle
> > >> hash-join, broadcast hash-join). Users only need to pass the `build`
> > side
> > >> table(usually the smaller one) into `SHUFFLE_HASH` join hint, more
> > >> concisely than `USE_HASH(left_table, right_table)`. Please correct me
> if
> > >> my
> > >> understanding is wrong.
> > >> Regarding the `SKEW` hint, agree with you that it can be used widely,
> > and
> > >> I
> > >> prefer to treat it as a metadata hint, a new category differs from a
> > join
> > >> hint.
> > >> For your example:
> > >> ```
> > >> SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */
> > o.order_id,
> > >> o.total, c.country, c.zip
> > >> FROM Orders AS o
> > >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> > >> ON o.customer_id = c.id;
> > >> ```
> > >> I would prefer another form:
> > >> ```
> > >> -- provide the skew info to let the engine choose the optimal plan
> > >> SELECT /*+ SKEW('Orders') */ o.order_id, ...
> > >>
> > >> -- or introduce a new hint for the join case, e.g.,
> > >> SELECT /*+ REPLICATED_SHUFFLE_HASH('Orders') */ o.order_id, ...
> > >> ```
> > >>
> > >> 2. Agree with Martin adding the feature to 1.16, we need time to
> > complete
> > >> the change in calcite and also the upgrading work.
> > >>
> > >> 3. I misunderstood the 'Other Alternatives' part as the 'Rejected'
> ones
> > in
> > >> the FLIP doc. And my point is avoiding the hacky way with our best
> > effort.
> > >> The potential issues for calcite's hint propagation, e.g., join hints
> > >> correctly propagate into proper join scope include subquery or views
> > which
> > >> may have various sql operators, so we should check all kinds of
> > operators
> > >> for the correct propagation. Hope this may help. And also cc @Shuo
> Cheng
> > >> may
> > >> offer more help.
> > >>
> > >>
> > >> Best,
> > >> Lincoln Lee
> > >>
> > >>
> > >> Martijn Visser <mart...@ververica.com> 于2021年12月29日周三 22:21写道:
> > >>
> > >> > Hi Jing,
> > >> >
> > >> > Thanks for explaining this in more detail and also to others
> > >> > participating.
> > >> >
> > >> > > I think using query hints in this case is more natural for users,
> > >> WDYT?
> > >> >
> > >> > Yes, I agree. As long as we properly explain in our documentation
> that
> > >> we
> > >> > support both Query Hints and Table Hints, what's the difference
> > between
> > >> > them and how to use them, I think our users can understand this
> > >> perfectly.
> > >> >
> > >> > > I admit upgrading from Calcite 1.26 to 1.30 would be a big change.
> > >> > However we could not always avoid upgrade for the following reason
> > >> >
> > >> > We have to upgrade Calcite. We actually considered putting that in
> the
> > >> > Flink 1.15 scope but ultimately had to drop it, but I definitely
> think
> > >> this
> > >> > needs to be done for 1.16. It's not only because of new features
> that
> > >> are
> > >> > depending on Calcite upgrades, but also because newer versions have
> > >> > resolved bugs that also hurt our users. That's why we also already
> > have
> > >> > tickets for upgrading to Calcite 1.27 [1] and 1.28 [2].
> > >> >
> > >> > With regards to using `use_hash` as a term, I think the most
> important
> > >> part
> > >> > is that if we re-use a term like Oracle is using, is that the
> > behaviour
> > >> and
> > >> > outcome should be the same/comparable to the one from (in this case)
> > >> > Oracle. If their behaviour and outcome are not the same or
> > comparable, I
> > >> > would probably introduce our own term to avoid that users get
> > confused.
> > >> >
> > >> > Best regards,
> > >> >
> > >> > Martijn
> > >> >
> > >> > [1] https://issues.apache.org/jira/browse/FLINK-20873
> > >> > [2] https://issues.apache.org/jira/browse/FLINK-21239
> > >> >
> > >> > On Wed, 29 Dec 2021 at 14:18, Jing Zhang <beyond1...@gmail.com>
> > wrote:
> > >> >
> > >> > > Hi Jian gang,
> > >> > > Thanks for the feedback.
> > >> > >
> > >> > > > When it comes to hive, how do you load partial data instead of
> the
> > >> > >    whole data? Any change related with hive?
> > >> > >
> > >> > > The question is same as Yuan mentioned before.
> > >> > > I prefer to drive another FLIP on this topic to further discussion
> > >> > > individually because this point involves many extension on API.
> > >> > > Here I would like to share the implementation in our internal
> > version
> > >> > > firstly, it maybe very different with the final solution which
> > merged
> > >> to
> > >> > > community.
> > >> > > The core idea is push the partitioner information down to the
> lookup
> > >> > table
> > >> > > source.
> > >> > > Hive connector need also upgrades. When loading data into caches,
> > each
> > >> > task
> > >> > > could only store records which look keys are sent to current task.
> > >> > >
> > >> > > > How to define the cache configuration? For example, the size and
> > the
> > >> > ttl.
> > >> > >
> > >> > > I'm afraid there is no a unify caching configuration and cache
> > >> > > implementation of different connectors yet.
> > >> > > You could find cache size and ttl config of JDBC in doc [1], HBase
> > in
> > >> doc
> > >> > > [2]
> > >> > >
> > >> > > >  Will this feature add another shuffle phase compared with the
> > >> default
> > >> > >    behavior? In what situations will user choose this feature?
> > >> > >
> > >> > > Yes, if user specify hash hint in query, optimizer would prefer to
> > >> choose
> > >> > > Hash Lookup Join, which would add a Hash Shuffle.
> > >> > > If lookup table source has cache inside (for example HBase/Jdbc)
> and
> > >> the
> > >> > > benefit of increasing cache hit ratio is bigger than add an extra
> > >> shuffle
> > >> > > cost, the user could use Hash Lookup Join.
> > >> > >
> > >> > > >  For the keys, the default implementation will be ok. But I
> wonder
> > >> > > whether we can support more flexible strategies.
> > >> > >
> > >> > > The question is same as Yuan mentioned before.
> > >> > >
> > >> > > I'm afraid there is no plan to support flexible strategies yet
> > because
> > >> > the
> > >> > > feature involves many things, for example:
> > >> > > 1. sql syntax
> > >> > > 2. user defined partitioner API
> > >> > > 3. RelDistribution type extension and Flink RelDistribution
> > extension
> > >> > > 4. FlinkExpandConversionRule
> > >> > > 5. Exchange execNode extension
> > >> > > 6. ....
> > >> > > It needs well designed and more discussion. If this is a strong
> > >> > > requirement, we would drive another discussion on this point
> > >> > individually.
> > >> > > In this FLIP, I would first support hash shuffle. WDYT?
> > >> > >
> > >> > > Best,
> > >> > > Jing Zhang
> > >> > >
> > >> > > [1]
> > >> > >
> > >> > >
> > >> >
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/#connector-options
> > >> > > [2]
> > >> > >
> > >> > >
> > >> >
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hbase/#connector-options
> > >> > >
> > >> > > Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 20:37写道:
> > >> > >
> > >> > > > Hi Wenlong,
> > >> > > > Thanks for the feedback.
> > >> > > > I've checked similar syntax in other systems, they are all
> > different
> > >> > from
> > >> > > > each other. It seems to be without consensus.
> > >> > > > As mentioned in FLIP-204, oracle uses a query hint, the hint
> name
> > is
> > >> > > > 'use_hash' [1].
> > >> > > > Spark also uses a query hint, its name is 'SHUFFLE_HASH' [2].
> > >> > > > SQL Server uses keyword 'HASH' instead of query hint [3].
> > >> > > > Note, the purposes of hash shuffle in [1][2][3] are a little
> > >> different
> > >> > > > from the purpose of FLIP-204, we just discuss syntax here.
> > >> > > >
> > >> > > > I've added this part to FLIP waiting for further discussion.
> > >> > > >
> > >> > > > Best,
> > >> > > > Jing Zhang
> > >> > > >
> > >> > > > [1]
> > >> > > >
> > >> >
> > >>
> > https://docs.oracle.com/cd/B12037_01/server.101/b10752/hintsref.htm#5683
> > >> > > > [2]
> > >> > > >
> > >> >
> > >>
> > https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html
> > >> > > > [3]
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-join?view=sql-server-ver15
> > >> > > >
> > >> > > >
> > >> > > > wenlong.lwl <wenlong88....@gmail.com> 于2021年12月29日周三 17:18写道:
> > >> > > >
> > >> > > >> Hi, Jing, thanks for driving the discussion.
> > >> > > >>
> > >> > > >> Have you made some investigation on the syntax of join hint?
> > >> > > >> Why do you choose USE_HASH from oracle instead of the style of
> > >> spark
> > >> > > >> SHUFFLE_HASH, they are quite different.
> > >> > > >> People in the big data world may be more familiar with
> > spark/hive,
> > >> if
> > >> > we
> > >> > > >> need to choose one, personally, I prefer the style of spark.
> > >> > > >>
> > >> > > >>
> > >> > > >> Best,
> > >> > > >> Wenlong
> > >> > > >>
> > >> > > >> On Wed, 29 Dec 2021 at 16:48, zst...@163.com <zst...@163.com>
> > >> wrote:
> > >> > > >>
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >
> > >> > > >> > Hi Jing,
> > >> > > >> > Thanks for your detail reply.
> > >> > > >> > 1) In the last suggestion, hash by primary key is not use for
> > >> > raising
> > >> > > >> the
> > >> > > >> > cache hit, but handling with skew of left source. Now that
> you
> > >> have
> > >> > > >> 'skew'
> > >> > > >> > hint and other discussion about it, I'm looking forward to
> it.
> > >> > > >> > 2) I mean to support user defined partitioner function. We
> > have a
> > >> > case
> > >> > > >> > that joining a datalake source with special way of partition,
> > and
> > >> > have
> > >> > > >> > implemented not elegantly in our internal version. As you
> said,
> > >> it
> > >> > > needs
> > >> > > >> > more design.
> > >> > > >> > 3) I thing so-called 'HashPartitionedCache' is usefull,
> > otherwise
> > >> > > >> loading
> > >> > > >> > all data such as hive lookup table source is almost not
> > >> available in
> > >> > > big
> > >> > > >> > data.
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >
> > >> > > >> > Best regards,
> > >> > > >> > Yuan
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >
> > >> > > >> >
> > >> > > >> > 在 2021-12-29 14:52:11,"Jing Zhang" <beyond1...@gmail.com>
> 写道:
> > >> > > >> > >Hi, Lincoln
> > >> > > >> > >Thanks a lot for the feedback.
> > >> > > >> > >
> > >> > > >> > >>  Regarding the hint name ‘USE_HASH’, could we consider
> more
> > >> > > >> candidates?
> > >> > > >> > >Things are a little different from RDBMS in the distributed
> > >> world,
> > >> > > and
> > >> > > >> we
> > >> > > >> > >also aim to solve the data skew problem, so all these
> incoming
> > >> > hints
> > >> > > >> names
> > >> > > >> > >should be considered together.
> > >> > > >> > >
> > >> > > >> > >About skew problem, I would discuss this in next FLIP
> > >> > individually. I
> > >> > > >> > would
> > >> > > >> > >like to share hint proposal for skew here.
> > >> > > >> > >I want to introduce 'skew' hint which is a query hint,
> similar
> > >> with
> > >> > > >> skew
> > >> > > >> > >hint in spark [1] and MaxCompute[2].
> > >> > > >> > >The 'skew' hint could only contain the name of the table
> with
> > >> skew.
> > >> > > >> > >Besides, skew hint could accept table name and column names.
> > >> > > >> > >In addition, skew hint could accept table name, column names
> > and
> > >> > skew
> > >> > > >> > >values.
> > >> > > >> > >For example:
> > >> > > >> > >
> > >> > > >> > >SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders')
> */
> > >> > > >> o.order_id,
> > >> > > >> > >o.total, c.country, c.zip
> > >> > > >> > >FROM Orders AS o
> > >> > > >> > >JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> > >> > > >> > >ON o.customer_id = c.id;
> > >> > > >> > >
> > >> > > >> > >The 'skew' hint is not only used for look up join here, but
> > also
> > >> > > could
> > >> > > >> be
> > >> > > >> > >used for other types of join later, for example, batch hash
> > >> join or
> > >> > > >> > >streaming regular join.
> > >> > > >> > >Go back to better name problem for hash look up join. Since
> > the
> > >> > > 'skew'
> > >> > > >> > hint
> > >> > > >> > >is a separate hint, so 'use_hash' is still an alternative.
> > >> > > >> > >WDYT?
> > >> > > >> > >I don't have a good idea about the better hint name yet. I
> > would
> > >> > like
> > >> > > >> to
> > >> > > >> > >heard more suggestions about hint names.
> > >> > > >> > >
> > >> > > >> > >>  As you mentioned in the flip, this solution depends on
> > future
> > >> > > >> changes
> > >> > > >> > to
> > >> > > >> > >calcite (and also upgrading calcite would be another
> possible
> > >> big
> > >> > > >> change:
> > >> > > >> > >at least calicite-1.30 vs 1.26, are we preparing to accept
> > this
> > >> big
> > >> > > >> > >change?).
> > >> > > >> > >
> > >> > > >> > >Indeed, solution 1 depends on calcite upgrade.
> > >> > > >> > >I admit upgrade from Calcite 1.26 to 1.30 would be a big
> > >> change. I
> > >> > > >> still
> > >> > > >> > >remember what we have suffered from last upgrade to Calcite
> > >> 1.26.
> > >> > > >> > >However we could not always avoid upgrade for the following
> > >> reason:
> > >> > > >> > >1. Other features also depends on the Calcite upgrade. For
> > >> example,
> > >> > > >> > Session
> > >> > > >> > >Window and Count Window.
> > >> > > >> > >2. If we always avoid Calcite upgrade, there would be more
> gap
> > >> with
> > >> > > the
> > >> > > >> > >latest version. One day, if upgrading becomes a thing which
> > has
> > >> to
> > >> > be
> > >> > > >> > done,
> > >> > > >> > >the pain is more.
> > >> > > >> > >
> > >> > > >> > >WDYT?
> > >> > > >> > >
> > >> > > >> > >>  Is there another possible way to minimize the change in
> > >> calcite?
> > >> > > >> > >
> > >> > > >> > >Do you check the 'Other Alternatives' part in the FLIP-204?
> It
> > >> > gives
> > >> > > >> > >another solution which does not depend on calcite upgrade
> and
> > do
> > >> > not
> > >> > > >> need
> > >> > > >> > >to worry about the hint would be missed in the propagation.
> > >> > > >> > >This is also what we have done in the internal version.
> > >> > > >> > >The core idea is propagating 'use_hash' hint to TableScan
> with
> > >> > > matched
> > >> > > >> > >table names.  However, it is a little hacky.
> > >> > > >> > >
> > >> > > >> > >> As I know there're more limitations than `Correlate`.
> > >> > > >> > >
> > >> > > >> > >As mentioned before, in our external version, I choose the
> the
> > >> > 'Other
> > >> > > >> > >Alternatives' part in the FLIP-204.
> > >> > > >> > >Although I do a POC in the solution 1 and lists all changes
> I
> > >> found
> > >> > > in
> > >> > > >> the
> > >> > > >> > >FLIP, there may still be something I missed.
> > >> > > >> > >I'm very happy to hear that you point out there're more
> > >> limitations
> > >> > > >> except
> > >> > > >> > >for `Correlate`, would you please give more details on this
> > >> part?
> > >> > > >> > >
> > >> > > >> > >Best,
> > >> > > >> > >Jing Zhang
> > >> > > >> > >
> > >> > > >> > >[1]
> > >> > > https://docs.databricks.com/delta/join-performance/skew-join.html
> > >> > > >> > >[2]
> > >> > > >> > >
> > >> > > >> >
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://help.aliyun.com/apsara/enterprise/v_3_13_0_20201215/odps/enterprise-ascm-user-guide/hotspot-tilt.html?spm=a2c4g.14484438.10001.669
> > >> > > >> > >
> > >> > > >> > >Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 14:40写道:
> > >> > > >> > >
> > >> > > >> > >> Hi Yuan and Lincoln,
> > >> > > >> > >> thanks a lot for the attention. I would answer the email
> one
> > >> by
> > >> > > one.
> > >> > > >> > >>
> > >> > > >> > >> To Yuan
> > >> > > >> > >> > How shall we deal with CDC data? If there is CDC data in
> > the
> > >> > > >> pipeline,
> > >> > > >> > >> IMHO, shuffle by join key will cause CDC data disorder.
> Will
> > >> it
> > >> > be
> > >> > > >> > better
> > >> > > >> > >> to use primary key in this case?
> > >> > > >> > >>
> > >> > > >> > >> Good question.
> > >> > > >> > >> The problem could not only exists in CDC data source, but
> > also
> > >> > > exists
> > >> > > >> > when
> > >> > > >> > >> the input stream is not insert-only stream (for example,
> the
> > >> > result
> > >> > > >> of
> > >> > > >> > >> unbounded aggregate or regular join).
> > >> > > >> > >> I think use hash by primary key is not a good choise. It
> > could
> > >> > not
> > >> > > >> raise
> > >> > > >> > >> the cache hit because cache key is look up key instead of
> > >> primary
> > >> > > >> key of
> > >> > > >> > >> input.
> > >> > > >> > >>
> > >> > > >> > >> To avoid wrong result, hash lookup Join requires that the
> > >> input
> > >> > > >> stream
> > >> > > >> > >> should be insert_only stream or its upsert keys contains
> > >> lookup
> > >> > > keys.
> > >> > > >> > >>
> > >> > > >> > >> I've added this limitation to FLIP, thanks a lot for
> > >> reminding.
> > >> > > >> > >>
> > >> > > >> > >> > If the shuffle keys can be customized  when users have
> the
> > >> > > >> knowledge
> > >> > > >> > >> about distribution of data?
> > >> > > >> > >>
> > >> > > >> > >> I'm not sure I understand your question.
> > >> > > >> > >>
> > >> > > >> > >> Do you mean to support user defined partitioner function
> on
> > >> keys
> > >> > > just
> > >> > > >> > like
> > >> > > >> > >> flink DataStream sql?
> > >> > > >> > >> If yes, I'm afraid there is no plan to support this
> feature
> > >> yet
> > >> > > >> because
> > >> > > >> > >> the feature involves many things, for example:
> > >> > > >> > >> 1. sql syntax
> > >> > > >> > >> 2. user defined partitioner API
> > >> > > >> > >> 3. RelDistribution type extension and Flink
> RelDistribution
> > >> > > extension
> > >> > > >> > >> 4. FlinkExpandConversionRule
> > >> > > >> > >> 5. Exchange execNode extension
> > >> > > >> > >> 6. ....
> > >> > > >> > >> It needs well designed and more discussion. If this is a
> > >> strong
> > >> > > >> > >> requirement, we would drive another discussion on this
> point
> > >> > > >> > individually.
> > >> > > >> > >> In this FLIP, I would first support hash shuffle. WDYT?
> > >> > > >> > >>
> > >> > > >> > >> Or do you mean support hash by other keys instead of
> lookup
> > >> key?
> > >> > > >> > >> If yes, would you please tell me a specific user case?
> > >> > > >> > >> We need to fetch the record from external storage of
> > dimension
> > >> > > table
> > >> > > >> by
> > >> > > >> > >> look up key, so those dimension table source uses look up
> > >> keys as
> > >> > > >> cache
> > >> > > >> > >> key.
> > >> > > >> > >> We could only increase  the cache ratio by shuffle lookup
> > >> keys.
> > >> > > >> > >> I need more use cases to understand this requirement.
> > >> > > >> > >>
> > >> > > >> > >> > Some connectors such as hive, caches all data in
> > >> > LookupFunction.
> > >> > > >> How
> > >> > > >> > to
> > >> > > >> > >> decrease the valid cache data size if data can be
> shuffled?
> > >> > > >> > >>
> > >> > > >> > >> Very good idea.
> > >> > > >> > >> There are two types of cache.
> > >> > > >> > >> For Key-Value storage, such as Redis/HBase, the lookup
> table
> > >> > source
> > >> > > >> > stores
> > >> > > >> > >> the visited lookup keys and it's record into cache lazily.
> > >> > > >> > >> For other storage without keys, such as hive, each task
> > loads
> > >> all
> > >> > > >> data
> > >> > > >> > >> into cache eagerly in the initialize phase.
> > >> > > >> > >> After introduce hash partitioner, for key-value storages,
> > >> there
> > >> > is
> > >> > > no
> > >> > > >> > need
> > >> > > >> > >> to change; for hive, each task could only load part of
> cache
> > >> > > instead
> > >> > > >> of
> > >> > > >> > >> load all cache.
> > >> > > >> > >>
> > >> > > >> > >> We have implemented this optimization in our internal
> > version.
> > >> > > >> > >> The core idea is push the partitioner information down to
> > the
> > >> > > lookup
> > >> > > >> > table
> > >> > > >> > >> source. When loading data into caches, each task could
> only
> > >> store
> > >> > > >> those
> > >> > > >> > >> records which look keys are sent to current task.
> > >> > > >> > >> We called this 'HashPartitionedCache'.
> > >> > > >> > >>
> > >> > > >> > >> I have added this point into the Lookup Join requirements
> > >> list in
> > >> > > the
> > >> > > >> > >> motivation of the FLIP, but I would not do this point in
> > this
> > >> > FLIP
> > >> > > >> right
> > >> > > >> > >> now.
> > >> > > >> > >> If this is a strong requirement, we need drive another
> > >> discussion
> > >> > > on
> > >> > > >> > this
> > >> > > >> > >> topic individually because this point involves many
> > extension
> > >> on
> > >> > > API.
> > >> > > >> > >>
> > >> > > >> > >> Best,
> > >> > > >> > >> Jing Zhang
> > >> > > >> > >>
> > >> > > >> > >>
> > >> > > >> > >> Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月29日周三
> > 10:01写道:
> > >> > > >> > >>
> > >> > > >> > >>> Hi Jing,
> > >> > > >> > >>>     Thanks for bringing up this discussion!  Agree that
> > this
> > >> > join
> > >> > > >> hints
> > >> > > >> > >>> should benefit both bounded and unbounded cases as Martin
> > >> > > mentioned.
> > >> > > >> > >>> I also agree that implementing the query hint is the
> right
> > >> way
> > >> > > for a
> > >> > > >> > more
> > >> > > >> > >>> general purpose since the dynamic table options has a
> > limited
> > >> > > scope.
> > >> > > >> > >>>    Some points I'd like to share are:
> > >> > > >> > >>> 1. Regarding the hint name ‘USE_HASH’, could we consider
> > more
> > >> > > >> > candidates?
> > >> > > >> > >>> Things are a little different from RDBMS in the
> distributed
> > >> > world,
> > >> > > >> and
> > >> > > >> > we
> > >> > > >> > >>> also aim to solve the data skew problem, so all these
> > >> incoming
> > >> > > hints
> > >> > > >> > names
> > >> > > >> > >>> should be considered together.
> > >> > > >> > >>> 2. As you mentioned in the flip, this solution depends on
> > >> future
> > >> > > >> > changes
> > >> > > >> > >>> to
> > >> > > >> > >>> calcite (and also upgrading calcite would be another
> > possible
> > >> > big
> > >> > > >> > change:
> > >> > > >> > >>> at least calicite-1.30 vs 1.26, are we preparing to
> accept
> > >> this
> > >> > > big
> > >> > > >> > >>> change?). Is there another possible way to minimize the
> > >> change
> > >> > in
> > >> > > >> > calcite?
> > >> > > >> > >>> As I know there're more limitations than `Correlate`.
> > >> > > >> > >>>
> > >> > > >> > >>> Best,
> > >> > > >> > >>> Lincoln Lee
> > >> > > >> > >>>
> > >> > > >> > >>>
> > >> > > >> > >>> Jing Zhang <beyond1...@gmail.com> 于2021年12月28日周二
> 23:04写道:
> > >> > > >> > >>>
> > >> > > >> > >>> > Hi Martijn,
> > >> > > >> > >>> > Thanks a lot for your attention.
> > >> > > >> > >>> > I'm sorry I didn't explain the motivation clearly. I
> > would
> > >> > like
> > >> > > to
> > >> > > >> > >>> explain
> > >> > > >> > >>> > it in detail, and then give response on your questions.
> > >> > > >> > >>> > A lookup join is typically used to enrich a table with
> > data
> > >> > that
> > >> > > >> is
> > >> > > >> > >>> queried
> > >> > > >> > >>> > from an external system. Many Lookup table sources
> > >> introduce
> > >> > > >> cache in
> > >> > > >> > >>> order
> > >> > > >> > >>> > to reduce the RPC call, such as JDBC, CSV, HBase
> > >> connectors.
> > >> > > >> > >>> > For those connectors, we could raise cache hit ratio by
> > >> > routing
> > >> > > >> the
> > >> > > >> > same
> > >> > > >> > >>> > lookup keys to the same task instance. This is the
> > purpose
> > >> of
> > >> > > >> > >>> >
> > >> > > >> > >>> >
> > >> > > >> > >>>
> > >> > > >> >
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> > >> > > >> > >>> > .
> > >> > > >> > >>> > Other cases might benefit from Hash distribution, such
> as
> > >> > batch
> > >> > > >> hash
> > >> > > >> > >>> join
> > >> > > >> > >>> > as you mentioned. It is a cool idea, however it is not
> > the
> > >> > > >> purpose of
> > >> > > >> > >>> this
> > >> > > >> > >>> > FLIP, we could discuss this in FLINK-20670
> > >> > > >> > >>> > <https://issues.apache.org/jira/browse/FLINK-20670>.
> > >> > > >> > >>> >
> > >> > > >> > >>> > > - When I was reading about this topic [1] I was
> > >> wondering if
> > >> > > >> this
> > >> > > >> > >>> feature
> > >> > > >> > >>> > would be more beneficial for bounded use cases and not
> so
> > >> much
> > >> > > for
> > >> > > >> > >>> > unbounded use cases. What do you think?
> > >> > > >> > >>> >
> > >> > > >> > >>> > As mentioned before, the purpose of Hash Lookup Join is
> > to
> > >> > > >> increase
> > >> > > >> > the
> > >> > > >> > >>> > cache hit ratio which is different from Oracle Hash
> Join.
> > >> > > However
> > >> > > >> we
> > >> > > >> > >>> could
> > >> > > >> > >>> > use the similar hint syntax.
> > >> > > >> > >>> >
> > >> > > >> > >>> > > - If I look at the current documentation for SQL
> Hints
> > in
> > >> > > Flink
> > >> > > >> > [2], I
> > >> > > >> > >>> > notice that all of the hints there are located at the
> end
> > >> of
> > >> > the
> > >> > > >> SQL
> > >> > > >> > >>> > statement. In the FLIP, the use_hash is defined
> directly
> > >> after
> > >> > > the
> > >> > > >> > >>> 'SELECT'
> > >> > > >> > >>> > keyword. Can we somehow make this consistent for the
> > user?
> > >> Or
> > >> > > >> should
> > >> > > >> > the
> > >> > > >> > >>> > user be able to specify hints anywhere in its SQL
> > >> statement?
> > >> > > >> > >>> >
> > >> > > >> > >>> > Calcite supports hints in two locations [3]:
> > >> > > >> > >>> > Query Hint: right after the SELECT keyword;
> > >> > > >> > >>> > Table Hint: right after the referenced table name.
> > >> > > >> > >>> > Now Flink has supported dynamic table options based on
> > the
> > >> > Hint
> > >> > > >> > >>> framework
> > >> > > >> > >>> > of Calcite which is mentioned in doc[2].
> > >> > > >> > >>> > Besides, query hints are also important, it could give
> a
> > >> hint
> > >> > > for
> > >> > > >> > >>> > optimizers to choose a better plan. Almost all popular
> > >> > databases
> > >> > > >> and
> > >> > > >> > >>> > big-data engines support sql query hints, such as
> oracle,
> > >> > hive,
> > >> > > >> spark
> > >> > > >> > >>> and
> > >> > > >> > >>> > so on.
> > >> > > >> > >>> > I think using query hints in this case is more natural
> > for
> > >> > > users,
> > >> > > >> > WDYT?
> > >> > > >> > >>> >
> > >> > > >> > >>> > I have updated the motivation part in the FLIP,
> > >> > > >> > >>> > Thanks for the feedback!
> > >> > > >> > >>> >
> > >> > > >> > >>> > [1]
> > >> > > >>
> https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
> > >> > > >> > >>> > [2]
> > >> > > >> > >>> >
> > >> > > >> > >>> >
> > >> > > >> > >>>
> > >> > > >> >
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
> > >> > > >> > >>> > [3]
> > >> https://calcite.apache.org/docs/reference.html#sql-hints
> > >> > > >> > >>> >
> > >> > > >> > >>> > Best,
> > >> > > >> > >>> > Jing Zhang
> > >> > > >> > >>> >
> > >> > > >> > >>> > Martijn Visser <mart...@ververica.com> 于2021年12月28日周二
> > >> > 22:02写道:
> > >> > > >> > >>> >
> > >> > > >> > >>> > > Hi Jing,
> > >> > > >> > >>> > >
> > >> > > >> > >>> > > Thanks a lot for the explanation and the FLIP. I
> > >> definitely
> > >> > > >> learned
> > >> > > >> > >>> > > something when reading more about `use_hash`. My
> > >> > > interpretation
> > >> > > >> > would
> > >> > > >> > >>> be
> > >> > > >> > >>> > > that the primary benefit of a hash lookup join would
> be
> > >> > > improved
> > >> > > >> > >>> > > performance by allowing the user to explicitly
> optimise
> > >> the
> > >> > > >> > planner.
> > >> > > >> > >>> > >
> > >> > > >> > >>> > > I have a couple of questions:
> > >> > > >> > >>> > >
> > >> > > >> > >>> > > - When I was reading about this topic [1] I was
> > >> wondering if
> > >> > > >> this
> > >> > > >> > >>> feature
> > >> > > >> > >>> > > would be more beneficial for bounded use cases and
> not
> > so
> > >> > much
> > >> > > >> for
> > >> > > >> > >>> > > unbounded use cases. What do you think?
> > >> > > >> > >>> > > - If I look at the current documentation for SQL
> Hints
> > in
> > >> > > Flink
> > >> > > >> > [2], I
> > >> > > >> > >>> > > notice that all of the hints there are located at the
> > >> end of
> > >> > > the
> > >> > > >> > SQL
> > >> > > >> > >>> > > statement. In the FLIP, the use_hash is defined
> > directly
> > >> > after
> > >> > > >> the
> > >> > > >> > >>> > 'SELECT'
> > >> > > >> > >>> > > keyword. Can we somehow make this consistent for the
> > >> user?
> > >> > Or
> > >> > > >> > should
> > >> > > >> > >>> the
> > >> > > >> > >>> > > user be able to specify hints anywhere in its SQL
> > >> statement?
> > >> > > >> > >>> > >
> > >> > > >> > >>> > > Best regards,
> > >> > > >> > >>> > >
> > >> > > >> > >>> > > Martijn
> > >> > > >> > >>> > >
> > >> > > >> > >>> > > [1]
> > >> > > >> >
> > https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
> > >> > > >> > >>> > > [2]
> > >> > > >> > >>> > >
> > >> > > >> > >>> > >
> > >> > > >> > >>> >
> > >> > > >> > >>>
> > >> > > >> >
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
> > >> > > >> > >>> > >
> > >> > > >> > >>> > >
> > >> > > >> > >>> > > On Tue, 28 Dec 2021 at 08:17, Jing Zhang <
> > >> > > beyond1...@gmail.com>
> > >> > > >> > >>> wrote:
> > >> > > >> > >>> > >
> > >> > > >> > >>> > > > Hi everyone,
> > >> > > >> > >>> > > > Look up join
> > >> > > >> > >>> > > > <
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > >
> > >> > > >> > >>> >
> > >> > > >> > >>>
> > >> > > >> >
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > >> > > >> > >>> > > > >[1]
> > >> > > >> > >>> > > > is
> > >> > > >> > >>> > > > commonly used feature in Flink SQL. We have
> received
> > >> many
> > >> > > >> > >>> optimization
> > >> > > >> > >>> > > > requirements on look up join. For example:
> > >> > > >> > >>> > > > 1. Enforces left side of lookup join do a hash
> > >> partitioner
> > >> > > to
> > >> > > >> > raise
> > >> > > >> > >>> > cache
> > >> > > >> > >>> > > > hint ratio
> > >> > > >> > >>> > > > 2. Solves the data skew problem after introduces
> hash
> > >> > lookup
> > >> > > >> join
> > >> > > >> > >>> > > > 3. Enables mini-batch optimization to reduce RPC
> call
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > > > Next we will solve these problems one by one.
> > >> Firstly,  we
> > >> > > >> would
> > >> > > >> > >>> focus
> > >> > > >> > >>> > on
> > >> > > >> > >>> > > > point 1, and continue to discuss point 2 and point
> 3
> > >> > later.
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > > > There are many similar requirements from user mail
> > list
> > >> > and
> > >> > > >> JIRA
> > >> > > >> > >>> about
> > >> > > >> > >>> > > hash
> > >> > > >> > >>> > > > Lookup Join, for example:
> > >> > > >> > >>> > > > 1. FLINK-23687 <
> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-23687>
> > >> > > >> > >>> -
> > >> > > >> > >>> > > > Introduce partitioned lookup join to enforce input
> of
> > >> > > >> LookupJoin
> > >> > > >> > to
> > >> > > >> > >>> > hash
> > >> > > >> > >>> > > > shuffle by lookup keys
> > >> > > >> > >>> > > > 2. FLINK-25396 <
> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-25396>
> > >> > > >> > >>> -
> > >> > > >> > >>> > > > lookupjoin source table for pre-partitioning
> > >> > > >> > >>> > > > 3. FLINK-25262 <
> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-25262>
> > >> > > >> > >>> -
> > >> > > >> > >>> > > > Support to send data to lookup table for
> > >> > > >> > KeyGroupStreamPartitioner
> > >> > > >> > >>> way
> > >> > > >> > >>> > > for
> > >> > > >> > >>> > > > SQL.
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > > > In this FLIP, I would like to start a discussion
> > about
> > >> > Hash
> > >> > > >> > Lookup
> > >> > > >> > >>> > Join.
> > >> > > >> > >>> > > > The core idea is introducing a 'USE_HASH' hint in
> > >> query.
> > >> > > This
> > >> > > >> > >>> syntax
> > >> > > >> > >>> > is
> > >> > > >> > >>> > > > directly user-oriented and therefore requires
> careful
> > >> > > design.
> > >> > > >> > >>> > > > There are two ways about how to propagate this hint
> > to
> > >> > > >> > LookupJoin in
> > >> > > >> > >>> > > > optimizer. We need further discussion to do final
> > >> decide.
> > >> > > >> Anyway,
> > >> > > >> > >>> the
> > >> > > >> > >>> > > > difference between the two solution is only about
> the
> > >> > > internal
> > >> > > >> > >>> > > > implementation and has no impact on the user.
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > > > For more detail on the proposal:
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > >
> > >> > > >> > >>> >
> > >> > > >> > >>>
> > >> > > >> >
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > > > Looking forward to your feedback, thanks.
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > > > Best,
> > >> > > >> > >>> > > > Jing Zhang
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > > > [1]
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > >
> > >> > > >> > >>> >
> > >> > > >> > >>>
> > >> > > >> >
> > >> > > >>
> > >> > >
> > >> >
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > >> > > >> > >>> > > >
> > >> > > >> > >>> > >
> > >> > > >> > >>> >
> > >> > > >> > >>>
> > >> > > >> > >>
> > >> > > >> >
> > >> > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to