Thanks Jing,

Looks good~

Best,
Jingsong

On Fri, Jan 21, 2022 at 2:00 PM Lincoln Lee <lincoln.8...@gmail.com> wrote:
>
> Hi, Jing
>   Glad to hear the agreement on the hint syntax, let's keep going!
>
> Best,
> Lincoln Lee
>
>
> Jing Zhang <beyond1...@gmail.com> 于2022年1月20日周四 16:52写道:
>
> > Hi Jingsong,
> > Thanks for the feedback.
> >
> > > Is there a conclusion about naming here? (Maybe I missed something?)
> > Use USE_HASH or some other names. Slightly confusing in the FLIP.
> >
> > 'SHUFFLE_HASH' is final hint name, 'USE_HASH' is rejected. I've updated the
> > FLIP.
> >
> > > And the problem of what to write inside the hint, as mentioned by
> > Lincoln.
> >
> > I agree with Lincolon to only include one 'build' side table name only.
> > Besides, Lookup Join only support dimension table as build table, it does
> > not support left input as build table because Lookup join is always
> > triggered by left side.
> >
> > > I think maybe we can list the grammars of other distributed systems,
> > like Hive Spark(Databricks) Snowflake?
> >
> > I add the grammars of other distributed systems(oracle, spark, impala, SQL
> > Server) in FLIP.
> >
> > [1] Oracle USE_Hash hint
> > <https://docs.oracle.com/cd/B12037_01/server.101/b10752/hintsref.htm#5683>
> > SELECT /*+ USE_HASH(l h) */ *
> >   FROM orders h, order_items l
> >   WHERE l.order_id = h.order_id
> >     AND l.order_id > 3500;
> >
> >
> > [2] Spark SHUFFLE_HASH hint
> > <
> > https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-syntax-qry-select-hints.html
> > >
> > SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
> >
> >
> > [3] IMPALA SHUFFLE hint
> > <https://impala.apache.org/docs/build/html/topics/impala_hints.html>
> > SELECT straight_join weather.wind_velocity, geospatial.altitude
> >   FROM weather JOIN /* +SHUFFLE */ geospatial
> >   ON weather.lat = geospatial.lat AND weather.long = geospatial.long;
> >
> >
> > [4] SQL Server Hash Keyword
> > <
> > https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-join?view=sql-server-ver15
> > >
> > SELECT p.Name, pr.ProductReviewID FROM Production.Product AS p LEFT OUTER
> > HASH JOIN Production.ProductReview AS pr ON p.ProductID = pr.ProductID
> > ORDER
> >  BY ProductReviewID DESC;
> >
> >
> > Hive does not have similar grammars because shuffle join is default join
> > behavior of Hive. it only have map join hint  grammars.
> >
> > I didn't find the similar query hint in Snowflake yet.
> >
> >
> > > About `SHUFFLE_HASH(left_table, right_table)`, one case can be shared:
> >
> > SELECT * FROM left_t
> >   JOIN right_1 ON ...
> >   JOIN right_2 ON ...
> >   JOIN right_3 ON ...
> >
> > What if we want to use shuffle_hash for all three joints?
> >
> > SELECT /*+ SHUFFLE_HASH('left_t', 'right_1', 'right_2', 'right_3') */ ?
> >
> > It does not work, because the left input of the second join is not
> > 'left_t' anymore. It is the output of the first join.
> >
> > Good point.
> > As mentioned before, now SHUFFLE_HASH hint only requires to specify build
> > table name.
> > So in the above case,
> > SELECT /*+ SHUFFLE_HASH('right_1', 'right_2', 'right_3') */
> >   * FROM left_t
> >   JOIN right_1 ON ...
> >   JOIN right_2 ON ...
> >   JOIN right_3 ON
> > It means require shuffle on lookup join which contain dimension table with
> > name as 'right_1' or 'right_2' or 'right_3'.
> >
> > WDYT?
> >
> > Best,
> > Jing Zhang
> >
> > Jingsong Li <jingsongl...@gmail.com> 于2022年1月20日周四 14:33写道:
> >
> > > Hi Jing,
> > >
> > > Sorry for the late reply!
> > >
> > > Is there a conclusion about naming here? (Maybe I missed something?)
> > > Use USE_HASH or some other names. Slightly confusing in the FLIP.
> > >
> > > And the problem of what to write inside the hint, as mentioned by
> > lincoln.
> > >
> > > I think maybe we can list the grammars of other distributed systems,
> > > like Hive Spark(Databricks) Snowflake?
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Thu, Jan 20, 2022 at 1:56 PM Lincoln Lee <lincoln.8...@gmail.com>
> > > wrote:
> > > >
> > > > Hi, Jing,
> > > >    Sorry for the late reply!  The previous discussion for the hint
> > syntax
> > > > left a minor difference there: whether to use both sides of join table
> > > > names or just one 'build' side table name only. I would prefer the
> > later
> > > > one.
> > > >  Users only need to pass the `build` side table(usually the smaller
> > one)
> > > > into `SHUFFLE_HASH(build_table)` join hint, more concisely than
> > > > `SHUFFLE_HASH(left_table, right_table)`, WDYT?
> > > >
> > > > Best,
> > > > Lincoln Lee
> > > >
> > > >
> > > > Jing Zhang <beyond1...@gmail.com> 于2022年1月15日周六 17:22写道:
> > > >
> > > > > Hi all,
> > > > > Thanks for all the feedback so far.
> > > > > If there is no more suggestions, I would like to drive a vote in
> > > Tuesday
> > > > > next week (18 Jan).
> > > > >
> > > > > Best,
> > > > > Jing Zhang
> > > > >
> > > > > Jing Zhang <beyond1...@gmail.com> 于2022年1月5日周三 11:33写道:
> > > > >
> > > > > > Hi Francesco,
> > > > > > Thanks a lot for the feedback.
> > > > > >
> > > > > > > does it makes sense for a lookup join to use hash distribution
> > > whenever
> > > > > > is possible by default?
> > > > > > I prefer to enable the hash lookup join only find the hint in the
> > > query
> > > > > > for the following reason:
> > > > > > 1. Plan compatibility
> > > > > >     Add a hash shuffle by default would leads to the change of plan
> > > after
> > > > > > users upgrade the flink version.
> > > > > >     Besides, lookup join is commonly used feature in flink SQL.
> > > > > > 2. Not all flink jobs could benefit from this improvement.
> > > > > >     It is a trade off for the lookup join with dimension connectors
> > > which
> > > > > > has cache inside.
> > > > > >     We hope the raise the cache hit ratio by Hash Lookup Join,
> > > however it
> > > > > > would leads to an extra shuffle at the same time.
> > > > > >     It is not always a positive optimization, especially for the
> > > > > > connectors which does not have cache inside.
> > > > > >
> > > > > > > Shouldn't the hint take the table alias as the "table name"?
> > What
> > > if
> > > > > > you do two lookup joins in cascade within the same query with the
> > > same
> > > > > > table (once
> > > > > > on a key, then on another one), where you use two different aliases
> > > for
> > > > > > the table?
> > > > > > In theory, it's better to support both table names and alias names.
> > > > > > But in calcite, the alias name of subquery or table would not be
> > > lost in
> > > > > > the sql conversion phase and sql optimization phase.
> > > > > > So here we only support table names.
> > > > > >
> > > > > > Best,
> > > > > > Jing Zhang
> > > > > >
> > > > > >
> > > > > > Francesco Guardiani <france...@ververica.com> 于2022年1月3日周一
> > 18:38写道:
> > > > > >
> > > > > >> Hi Jing,
> > > > > >>
> > > > > >> Thanks for the FLIP. I'm not very knowledgeable about the topic,
> > but
> > > > > going
> > > > > >> through both the FLIP and the discussion here, I wonder, does it
> > > makes
> > > > > >> sense for a lookup join to use hash distribution whenever is
> > > possible by
> > > > > >> default?
> > > > > >>
> > > > > >> The point you're explaining here:
> > > > > >>
> > > > > >> > Many Lookup table sources introduce cache in order
> > > > > >> to reduce the RPC call, such as JDBC, CSV, HBase connectors.
> > > > > >> For those connectors, we could raise cache hit ratio by routing
> > the
> > > same
> > > > > >> lookup keys to the same task instance
> > > > > >>
> > > > > >> Seems something we can infer automatically, rather than manually
> > > asking
> > > > > >> the
> > > > > >> user to add this hint to the query. Note that I'm not talking
> > > against
> > > > > the
> > > > > >> hint syntax, which might still make sense to be introduced, but I
> > > feel
> > > > > >> like
> > > > > >> this optimization makes sense in the general case when using the
> > > > > >> connectors
> > > > > >> you have quoted. Perhaps there is some downside I'm not aware of?
> > > > > >>
> > > > > >> Talking about the hint themselves, taking this example as
> > reference:
> > > > > >>
> > > > > >> SELECT /*+ SHUFFLE_HASH('Orders', 'Customers') */ o.order_id,
> > > o.total,
> > > > > >> c.country, c.zip
> > > > > >> FROM Orders AS o
> > > > > >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> > > > > >> ON o.customer_id = c.id;
> > > > > >>
> > > > > >> Shouldn't the hint take the table alias as the "table name"? What
> > > If you
> > > > > >> do
> > > > > >> two lookup joins in cascade within the same query with the same
> > > table
> > > > > >> (once
> > > > > >> on a key, then on another one), where you use two different
> > aliases
> > > for
> > > > > >> the
> > > > > >> table?
> > > > > >>
> > > > > >>
> > > > > >> On Fri, Dec 31, 2021 at 9:56 AM Jing Zhang <beyond1...@gmail.com>
> > > > > wrote:
> > > > > >>
> > > > > >> > Hi Lincoln,
> > > > > >> > Thanks for the feedback.
> > > > > >> >
> > > > > >> > > 1. For the hint name, +1 for WenLong's proposal.
> > > > > >> >
> > > > > >> > I've added add 'SHUFFLE_HASH' to other alternatives in FLIP.
> > Let's
> > > > > >> waiting
> > > > > >> > for more voices here.
> > > > > >> >
> > > > > >> > > Regarding the `SKEW` hint, agree with you that it can be used
> > > > > widely,
> > > > > >> and
> > > > > >> > I
> > > > > >> > prefer to treat it as a metadata hint, a new category differs
> > > from a
> > > > > >> join
> > > > > >> > hint.
> > > > > >> > For your example:
> > > > > >> > ```
> > > > > >> > SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */
> > > > > >> o.order_id,
> > > > > >> > o.total, c.country, c.zip
> > > > > >> > FROM Orders AS o
> > > > > >> > JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> > > > > >> > ON o.customer_id = c.id;
> > > > > >> > ```
> > > > > >> > I would prefer another form:
> > > > > >> > ```
> > > > > >> > -- provide the skew info to let the engine choose the optimal
> > plan
> > > > > >> > SELECT /*+ SKEW('Orders') */ o.order_id, ...
> > > > > >> >
> > > > > >> > -- or introduce a new hint for the join case, e.g.,
> > > > > >> > SELECT /*+ REPLICATED_SHUFFLE_HASH('Orders') */ o.order_id, ...
> > > > > >> > ```
> > > > > >> >
> > > > > >> > Maybe there is misunderstanding here.
> > > > > >> > I just use a syntax sugar here.
> > > > > >> >
> > > > > >> > SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */
> > > > > >> o.order_id,
> > > > > >> > ....
> > > > > >> >
> > > > > >> > is just a syntax with
> > > > > >> >
> > > > > >> > SELECT /*+ USE_HASH('Orders', 'Customers') */ /*+SKEW('Orders')
> > */
> > > > > >> > o.order_id,
> > > > > >> > ....
> > > > > >> >
> > > > > >> > Although I list 'USE_HASH' and 'SKEW' hint in a query hint
> > > clause, it
> > > > > >> does
> > > > > >> > not mean they must appear together as a whole.
> > > > > >> > Based on calcite syntax doc [1], you could list more than one
> > > hint in
> > > > > >> > a /*+' hint [, hint ]* '*/ clause.
> > > > > >> >
> > > > > >> > Each hint has different function.
> > > > > >> > The'USE_HASH' hint suggests the optimizer use hash partitioner
> > for
> > > > > >> Lookup
> > > > > >> > Join for table 'Orders' and table 'Customers' while the 'SKEW'
> > > hint
> > > > > >> tells
> > > > > >> > the optimizer the skew metadata about the table 'Orders'.
> > > > > >> >
> > > > > >> > Best,
> > > > > >> > Jing Zhang
> > > > > >> >
> > > > > >> > [1] https://calcite.apache.org/docs/reference.html#sql-hints
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > Jing Zhang <beyond1...@gmail.com> 于2021年12月31日周五 16:39写道:
> > > > > >> >
> > > > > >> > > Hi Martijn,
> > > > > >> > > Thanks for the feedback.
> > > > > >> > >
> > > > > >> > > Glad to hear that we reached a consensus on the first and
> > second
> > > > > >> point.
> > > > > >> > >
> > > > > >> > > About whether to use `use_hash` as a term, I think your
> > concern
> > > > > makes
> > > > > >> > > sense.
> > > > > >> > > Although the hash lookup join is similar to Hash join in
> > oracle
> > > that
> > > > > >> they
> > > > > >> > > all require hash distribution on input, there exists a little
> > > > > >> difference
> > > > > >> > > between them.
> > > > > >> > > About this point, Lincoln and WenLong both prefer the term
> > > > > >> > 'SHUFFLE_HASH',
> > > > > >> > > WDYT?
> > > > > >> > >
> > > > > >> > > Best,
> > > > > >> > > Jing Zhang
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月30日周四 11:21写道:
> > > > > >> > >
> > > > > >> > >> Hi Jing,
> > > > > >> > >>     Thanks for your explanation!
> > > > > >> > >>
> > > > > >> > >> 1. For the hint name, +1 for WenLong's proposal. I think the
> > > > > >> `SHUFFLE`
> > > > > >> > >> keyword is important in a classic distributed computing
> > system,
> > > > > >> > >> a hash-join usually means there's a shuffle stage(include
> > > shuffle
> > > > > >> > >> hash-join, broadcast hash-join). Users only need to pass the
> > > > > `build`
> > > > > >> > side
> > > > > >> > >> table(usually the smaller one) into `SHUFFLE_HASH` join hint,
> > > more
> > > > > >> > >> concisely than `USE_HASH(left_table, right_table)`. Please
> > > correct
> > > > > >> me if
> > > > > >> > >> my
> > > > > >> > >> understanding is wrong.
> > > > > >> > >> Regarding the `SKEW` hint, agree with you that it can be used
> > > > > widely,
> > > > > >> > and
> > > > > >> > >> I
> > > > > >> > >> prefer to treat it as a metadata hint, a new category differs
> > > from
> > > > > a
> > > > > >> > join
> > > > > >> > >> hint.
> > > > > >> > >> For your example:
> > > > > >> > >> ```
> > > > > >> > >> SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */
> > > > > >> > o.order_id,
> > > > > >> > >> o.total, c.country, c.zip
> > > > > >> > >> FROM Orders AS o
> > > > > >> > >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> > > > > >> > >> ON o.customer_id = c.id;
> > > > > >> > >> ```
> > > > > >> > >> I would prefer another form:
> > > > > >> > >> ```
> > > > > >> > >> -- provide the skew info to let the engine choose the optimal
> > > plan
> > > > > >> > >> SELECT /*+ SKEW('Orders') */ o.order_id, ...
> > > > > >> > >>
> > > > > >> > >> -- or introduce a new hint for the join case, e.g.,
> > > > > >> > >> SELECT /*+ REPLICATED_SHUFFLE_HASH('Orders') */ o.order_id,
> > ...
> > > > > >> > >> ```
> > > > > >> > >>
> > > > > >> > >> 2. Agree with Martin adding the feature to 1.16, we need time
> > > to
> > > > > >> > complete
> > > > > >> > >> the change in calcite and also the upgrading work.
> > > > > >> > >>
> > > > > >> > >> 3. I misunderstood the 'Other Alternatives' part as the
> > > 'Rejected'
> > > > > >> ones
> > > > > >> > in
> > > > > >> > >> the FLIP doc. And my point is avoiding the hacky way with our
> > > best
> > > > > >> > effort.
> > > > > >> > >> The potential issues for calcite's hint propagation, e.g.,
> > join
> > > > > hints
> > > > > >> > >> correctly propagate into proper join scope include subquery
> > or
> > > > > views
> > > > > >> > which
> > > > > >> > >> may have various sql operators, so we should check all kinds
> > of
> > > > > >> > operators
> > > > > >> > >> for the correct propagation. Hope this may help. And also cc
> > > @Shuo
> > > > > >> Cheng
> > > > > >> > >> may
> > > > > >> > >> offer more help.
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> > >> Best,
> > > > > >> > >> Lincoln Lee
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> > >> Martijn Visser <mart...@ververica.com> 于2021年12月29日周三
> > 22:21写道:
> > > > > >> > >>
> > > > > >> > >> > Hi Jing,
> > > > > >> > >> >
> > > > > >> > >> > Thanks for explaining this in more detail and also to
> > others
> > > > > >> > >> > participating.
> > > > > >> > >> >
> > > > > >> > >> > > I think using query hints in this case is more natural
> > for
> > > > > users,
> > > > > >> > >> WDYT?
> > > > > >> > >> >
> > > > > >> > >> > Yes, I agree. As long as we properly explain in our
> > > documentation
> > > > > >> that
> > > > > >> > >> we
> > > > > >> > >> > support both Query Hints and Table Hints, what's the
> > > difference
> > > > > >> > between
> > > > > >> > >> > them and how to use them, I think our users can understand
> > > this
> > > > > >> > >> perfectly.
> > > > > >> > >> >
> > > > > >> > >> > > I admit upgrading from Calcite 1.26 to 1.30 would be a
> > big
> > > > > >> change.
> > > > > >> > >> > However we could not always avoid upgrade for the following
> > > > > reason
> > > > > >> > >> >
> > > > > >> > >> > We have to upgrade Calcite. We actually considered putting
> > > that
> > > > > in
> > > > > >> the
> > > > > >> > >> > Flink 1.15 scope but ultimately had to drop it, but I
> > > definitely
> > > > > >> think
> > > > > >> > >> this
> > > > > >> > >> > needs to be done for 1.16. It's not only because of new
> > > features
> > > > > >> that
> > > > > >> > >> are
> > > > > >> > >> > depending on Calcite upgrades, but also because newer
> > > versions
> > > > > have
> > > > > >> > >> > resolved bugs that also hurt our users. That's why we also
> > > > > already
> > > > > >> > have
> > > > > >> > >> > tickets for upgrading to Calcite 1.27 [1] and 1.28 [2].
> > > > > >> > >> >
> > > > > >> > >> > With regards to using `use_hash` as a term, I think the
> > most
> > > > > >> important
> > > > > >> > >> part
> > > > > >> > >> > is that if we re-use a term like Oracle is using, is that
> > the
> > > > > >> > behaviour
> > > > > >> > >> and
> > > > > >> > >> > outcome should be the same/comparable to the one from (in
> > > this
> > > > > >> case)
> > > > > >> > >> > Oracle. If their behaviour and outcome are not the same or
> > > > > >> > comparable, I
> > > > > >> > >> > would probably introduce our own term to avoid that users
> > get
> > > > > >> > confused.
> > > > > >> > >> >
> > > > > >> > >> > Best regards,
> > > > > >> > >> >
> > > > > >> > >> > Martijn
> > > > > >> > >> >
> > > > > >> > >> > [1] https://issues.apache.org/jira/browse/FLINK-20873
> > > > > >> > >> > [2] https://issues.apache.org/jira/browse/FLINK-21239
> > > > > >> > >> >
> > > > > >> > >> > On Wed, 29 Dec 2021 at 14:18, Jing Zhang <
> > > beyond1...@gmail.com>
> > > > > >> > wrote:
> > > > > >> > >> >
> > > > > >> > >> > > Hi Jian gang,
> > > > > >> > >> > > Thanks for the feedback.
> > > > > >> > >> > >
> > > > > >> > >> > > > When it comes to hive, how do you load partial data
> > > instead
> > > > > of
> > > > > >> the
> > > > > >> > >> > >    whole data? Any change related with hive?
> > > > > >> > >> > >
> > > > > >> > >> > > The question is same as Yuan mentioned before.
> > > > > >> > >> > > I prefer to drive another FLIP on this topic to further
> > > > > >> discussion
> > > > > >> > >> > > individually because this point involves many extension
> > on
> > > API.
> > > > > >> > >> > > Here I would like to share the implementation in our
> > > internal
> > > > > >> > version
> > > > > >> > >> > > firstly, it maybe very different with the final solution
> > > which
> > > > > >> > merged
> > > > > >> > >> to
> > > > > >> > >> > > community.
> > > > > >> > >> > > The core idea is push the partitioner information down to
> > > the
> > > > > >> lookup
> > > > > >> > >> > table
> > > > > >> > >> > > source.
> > > > > >> > >> > > Hive connector need also upgrades. When loading data into
> > > > > caches,
> > > > > >> > each
> > > > > >> > >> > task
> > > > > >> > >> > > could only store records which look keys are sent to
> > > current
> > > > > >> task.
> > > > > >> > >> > >
> > > > > >> > >> > > > How to define the cache configuration? For example, the
> > > size
> > > > > >> and
> > > > > >> > the
> > > > > >> > >> > ttl.
> > > > > >> > >> > >
> > > > > >> > >> > > I'm afraid there is no a unify caching configuration and
> > > cache
> > > > > >> > >> > > implementation of different connectors yet.
> > > > > >> > >> > > You could find cache size and ttl config of JDBC in doc
> > > [1],
> > > > > >> HBase
> > > > > >> > in
> > > > > >> > >> doc
> > > > > >> > >> > > [2]
> > > > > >> > >> > >
> > > > > >> > >> > > >  Will this feature add another shuffle phase compared
> > > with
> > > > > the
> > > > > >> > >> default
> > > > > >> > >> > >    behavior? In what situations will user choose this
> > > feature?
> > > > > >> > >> > >
> > > > > >> > >> > > Yes, if user specify hash hint in query, optimizer would
> > > prefer
> > > > > >> to
> > > > > >> > >> choose
> > > > > >> > >> > > Hash Lookup Join, which would add a Hash Shuffle.
> > > > > >> > >> > > If lookup table source has cache inside (for example
> > > > > HBase/Jdbc)
> > > > > >> and
> > > > > >> > >> the
> > > > > >> > >> > > benefit of increasing cache hit ratio is bigger than add
> > an
> > > > > extra
> > > > > >> > >> shuffle
> > > > > >> > >> > > cost, the user could use Hash Lookup Join.
> > > > > >> > >> > >
> > > > > >> > >> > > >  For the keys, the default implementation will be ok.
> > > But I
> > > > > >> wonder
> > > > > >> > >> > > whether we can support more flexible strategies.
> > > > > >> > >> > >
> > > > > >> > >> > > The question is same as Yuan mentioned before.
> > > > > >> > >> > >
> > > > > >> > >> > > I'm afraid there is no plan to support flexible
> > strategies
> > > yet
> > > > > >> > because
> > > > > >> > >> > the
> > > > > >> > >> > > feature involves many things, for example:
> > > > > >> > >> > > 1. sql syntax
> > > > > >> > >> > > 2. user defined partitioner API
> > > > > >> > >> > > 3. RelDistribution type extension and Flink
> > RelDistribution
> > > > > >> > extension
> > > > > >> > >> > > 4. FlinkExpandConversionRule
> > > > > >> > >> > > 5. Exchange execNode extension
> > > > > >> > >> > > 6. ....
> > > > > >> > >> > > It needs well designed and more discussion. If this is a
> > > strong
> > > > > >> > >> > > requirement, we would drive another discussion on this
> > > point
> > > > > >> > >> > individually.
> > > > > >> > >> > > In this FLIP, I would first support hash shuffle. WDYT?
> > > > > >> > >> > >
> > > > > >> > >> > > Best,
> > > > > >> > >> > > Jing Zhang
> > > > > >> > >> > >
> > > > > >> > >> > > [1]
> > > > > >> > >> > >
> > > > > >> > >> > >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > >
> > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/#connector-options
> > > > > >> > >> > > [2]
> > > > > >> > >> > >
> > > > > >> > >> > >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > >
> > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hbase/#connector-options
> > > > > >> > >> > >
> > > > > >> > >> > > Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三
> > 20:37写道:
> > > > > >> > >> > >
> > > > > >> > >> > > > Hi Wenlong,
> > > > > >> > >> > > > Thanks for the feedback.
> > > > > >> > >> > > > I've checked similar syntax in other systems, they are
> > > all
> > > > > >> > different
> > > > > >> > >> > from
> > > > > >> > >> > > > each other. It seems to be without consensus.
> > > > > >> > >> > > > As mentioned in FLIP-204, oracle uses a query hint, the
> > > hint
> > > > > >> name
> > > > > >> > is
> > > > > >> > >> > > > 'use_hash' [1].
> > > > > >> > >> > > > Spark also uses a query hint, its name is
> > 'SHUFFLE_HASH'
> > > [2].
> > > > > >> > >> > > > SQL Server uses keyword 'HASH' instead of query hint
> > [3].
> > > > > >> > >> > > > Note, the purposes of hash shuffle in [1][2][3] are a
> > > little
> > > > > >> > >> different
> > > > > >> > >> > > > from the purpose of FLIP-204, we just discuss syntax
> > > here.
> > > > > >> > >> > > >
> > > > > >> > >> > > > I've added this part to FLIP waiting for further
> > > discussion.
> > > > > >> > >> > > >
> > > > > >> > >> > > > Best,
> > > > > >> > >> > > > Jing Zhang
> > > > > >> > >> > > >
> > > > > >> > >> > > > [1]
> > > > > >> > >> > > >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > https://docs.oracle.com/cd/B12037_01/server.101/b10752/hintsref.htm#5683
> > > > > >> > >> > > > [2]
> > > > > >> > >> > > >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > > https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html
> > > > > >> > >> > > > [3]
> > > > > >> > >> > > >
> > > > > >> > >> > >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > >
> > https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-join?view=sql-server-ver15
> > > > > >> > >> > > >
> > > > > >> > >> > > >
> > > > > >> > >> > > > wenlong.lwl <wenlong88....@gmail.com> 于2021年12月29日周三
> > > > > 17:18写道:
> > > > > >> > >> > > >
> > > > > >> > >> > > >> Hi, Jing, thanks for driving the discussion.
> > > > > >> > >> > > >>
> > > > > >> > >> > > >> Have you made some investigation on the syntax of join
> > > hint?
> > > > > >> > >> > > >> Why do you choose USE_HASH from oracle instead of the
> > > style
> > > > > of
> > > > > >> > >> spark
> > > > > >> > >> > > >> SHUFFLE_HASH, they are quite different.
> > > > > >> > >> > > >> People in the big data world may be more familiar with
> > > > > >> > spark/hive,
> > > > > >> > >> if
> > > > > >> > >> > we
> > > > > >> > >> > > >> need to choose one, personally, I prefer the style of
> > > spark.
> > > > > >> > >> > > >>
> > > > > >> > >> > > >>
> > > > > >> > >> > > >> Best,
> > > > > >> > >> > > >> Wenlong
> > > > > >> > >> > > >>
> > > > > >> > >> > > >> On Wed, 29 Dec 2021 at 16:48, zst...@163.com <
> > > > > zst...@163.com>
> > > > > >> > >> wrote:
> > > > > >> > >> > > >>
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> > Hi Jing,
> > > > > >> > >> > > >> > Thanks for your detail reply.
> > > > > >> > >> > > >> > 1) In the last suggestion, hash by primary key is
> > not
> > > use
> > > > > >> for
> > > > > >> > >> > raising
> > > > > >> > >> > > >> the
> > > > > >> > >> > > >> > cache hit, but handling with skew of left source.
> > Now
> > > that
> > > > > >> you
> > > > > >> > >> have
> > > > > >> > >> > > >> 'skew'
> > > > > >> > >> > > >> > hint and other discussion about it, I'm looking
> > > forward to
> > > > > >> it.
> > > > > >> > >> > > >> > 2) I mean to support user defined partitioner
> > > function. We
> > > > > >> > have a
> > > > > >> > >> > case
> > > > > >> > >> > > >> > that joining a datalake source with special way of
> > > > > >> partition,
> > > > > >> > and
> > > > > >> > >> > have
> > > > > >> > >> > > >> > implemented not elegantly in our internal version.
> > As
> > > you
> > > > > >> said,
> > > > > >> > >> it
> > > > > >> > >> > > needs
> > > > > >> > >> > > >> > more design.
> > > > > >> > >> > > >> > 3) I thing so-called 'HashPartitionedCache' is
> > > usefull,
> > > > > >> > otherwise
> > > > > >> > >> > > >> loading
> > > > > >> > >> > > >> > all data such as hive lookup table source is almost
> > > not
> > > > > >> > >> available in
> > > > > >> > >> > > big
> > > > > >> > >> > > >> > data.
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> > Best regards,
> > > > > >> > >> > > >> > Yuan
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >> > 在 2021-12-29 14:52:11,"Jing Zhang" <
> > > beyond1...@gmail.com>
> > > > > >> 写道:
> > > > > >> > >> > > >> > >Hi, Lincoln
> > > > > >> > >> > > >> > >Thanks a lot for the feedback.
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >>  Regarding the hint name ‘USE_HASH’, could we
> > > consider
> > > > > >> more
> > > > > >> > >> > > >> candidates?
> > > > > >> > >> > > >> > >Things are a little different from RDBMS in the
> > > > > distributed
> > > > > >> > >> world,
> > > > > >> > >> > > and
> > > > > >> > >> > > >> we
> > > > > >> > >> > > >> > >also aim to solve the data skew problem, so all
> > these
> > > > > >> incoming
> > > > > >> > >> > hints
> > > > > >> > >> > > >> names
> > > > > >> > >> > > >> > >should be considered together.
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >About skew problem, I would discuss this in next
> > FLIP
> > > > > >> > >> > individually. I
> > > > > >> > >> > > >> > would
> > > > > >> > >> > > >> > >like to share hint proposal for skew here.
> > > > > >> > >> > > >> > >I want to introduce 'skew' hint which is a query
> > > hint,
> > > > > >> similar
> > > > > >> > >> with
> > > > > >> > >> > > >> skew
> > > > > >> > >> > > >> > >hint in spark [1] and MaxCompute[2].
> > > > > >> > >> > > >> > >The 'skew' hint could only contain the name of the
> > > table
> > > > > >> with
> > > > > >> > >> skew.
> > > > > >> > >> > > >> > >Besides, skew hint could accept table name and
> > column
> > > > > >> names.
> > > > > >> > >> > > >> > >In addition, skew hint could accept table name,
> > > column
> > > > > >> names
> > > > > >> > and
> > > > > >> > >> > skew
> > > > > >> > >> > > >> > >values.
> > > > > >> > >> > > >> > >For example:
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >SELECT /*+ USE_HASH('Orders', 'Customers'),
> > > > > SKEW('Orders')
> > > > > >> */
> > > > > >> > >> > > >> o.order_id,
> > > > > >> > >> > > >> > >o.total, c.country, c.zip
> > > > > >> > >> > > >> > >FROM Orders AS o
> > > > > >> > >> > > >> > >JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time
> > AS c
> > > > > >> > >> > > >> > >ON o.customer_id = c.id;
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >The 'skew' hint is not only used for look up join
> > > here,
> > > > > but
> > > > > >> > also
> > > > > >> > >> > > could
> > > > > >> > >> > > >> be
> > > > > >> > >> > > >> > >used for other types of join later, for example,
> > > batch
> > > > > hash
> > > > > >> > >> join or
> > > > > >> > >> > > >> > >streaming regular join.
> > > > > >> > >> > > >> > >Go back to better name problem for hash look up
> > join.
> > > > > Since
> > > > > >> > the
> > > > > >> > >> > > 'skew'
> > > > > >> > >> > > >> > hint
> > > > > >> > >> > > >> > >is a separate hint, so 'use_hash' is still an
> > > > > alternative.
> > > > > >> > >> > > >> > >WDYT?
> > > > > >> > >> > > >> > >I don't have a good idea about the better hint name
> > > yet.
> > > > > I
> > > > > >> > would
> > > > > >> > >> > like
> > > > > >> > >> > > >> to
> > > > > >> > >> > > >> > >heard more suggestions about hint names.
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >>  As you mentioned in the flip, this solution
> > > depends on
> > > > > >> > future
> > > > > >> > >> > > >> changes
> > > > > >> > >> > > >> > to
> > > > > >> > >> > > >> > >calcite (and also upgrading calcite would be
> > another
> > > > > >> possible
> > > > > >> > >> big
> > > > > >> > >> > > >> change:
> > > > > >> > >> > > >> > >at least calicite-1.30 vs 1.26, are we preparing to
> > > > > accept
> > > > > >> > this
> > > > > >> > >> big
> > > > > >> > >> > > >> > >change?).
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >Indeed, solution 1 depends on calcite upgrade.
> > > > > >> > >> > > >> > >I admit upgrade from Calcite 1.26 to 1.30 would be
> > a
> > > big
> > > > > >> > >> change. I
> > > > > >> > >> > > >> still
> > > > > >> > >> > > >> > >remember what we have suffered from last upgrade to
> > > > > Calcite
> > > > > >> > >> 1.26.
> > > > > >> > >> > > >> > >However we could not always avoid upgrade for the
> > > > > following
> > > > > >> > >> reason:
> > > > > >> > >> > > >> > >1. Other features also depends on the Calcite
> > > upgrade.
> > > > > For
> > > > > >> > >> example,
> > > > > >> > >> > > >> > Session
> > > > > >> > >> > > >> > >Window and Count Window.
> > > > > >> > >> > > >> > >2. If we always avoid Calcite upgrade, there would
> > be
> > > > > more
> > > > > >> gap
> > > > > >> > >> with
> > > > > >> > >> > > the
> > > > > >> > >> > > >> > >latest version. One day, if upgrading becomes a
> > thing
> > > > > which
> > > > > >> > has
> > > > > >> > >> to
> > > > > >> > >> > be
> > > > > >> > >> > > >> > done,
> > > > > >> > >> > > >> > >the pain is more.
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >WDYT?
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >>  Is there another possible way to minimize the
> > > change
> > > > > in
> > > > > >> > >> calcite?
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >Do you check the 'Other Alternatives' part in the
> > > > > >> FLIP-204? It
> > > > > >> > >> > gives
> > > > > >> > >> > > >> > >another solution which does not depend on calcite
> > > upgrade
> > > > > >> and
> > > > > >> > do
> > > > > >> > >> > not
> > > > > >> > >> > > >> need
> > > > > >> > >> > > >> > >to worry about the hint would be missed in the
> > > > > propagation.
> > > > > >> > >> > > >> > >This is also what we have done in the internal
> > > version.
> > > > > >> > >> > > >> > >The core idea is propagating 'use_hash' hint to
> > > TableScan
> > > > > >> with
> > > > > >> > >> > > matched
> > > > > >> > >> > > >> > >table names.  However, it is a little hacky.
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >> As I know there're more limitations than
> > > `Correlate`.
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >As mentioned before, in our external version, I
> > > choose
> > > > > the
> > > > > >> the
> > > > > >> > >> > 'Other
> > > > > >> > >> > > >> > >Alternatives' part in the FLIP-204.
> > > > > >> > >> > > >> > >Although I do a POC in the solution 1 and lists all
> > > > > >> changes I
> > > > > >> > >> found
> > > > > >> > >> > > in
> > > > > >> > >> > > >> the
> > > > > >> > >> > > >> > >FLIP, there may still be something I missed.
> > > > > >> > >> > > >> > >I'm very happy to hear that you point out there're
> > > more
> > > > > >> > >> limitations
> > > > > >> > >> > > >> except
> > > > > >> > >> > > >> > >for `Correlate`, would you please give more details
> > > on
> > > > > this
> > > > > >> > >> part?
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >Best,
> > > > > >> > >> > > >> > >Jing Zhang
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >[1]
> > > > > >> > >> > >
> > > > > >> https://docs.databricks.com/delta/join-performance/skew-join.html
> > > > > >> > >> > > >> > >[2]
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >>
> > > > > >> > >> > >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > >
> > https://help.aliyun.com/apsara/enterprise/v_3_13_0_20201215/odps/enterprise-ascm-user-guide/hotspot-tilt.html?spm=a2c4g.14484438.10001.669
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三
> > > > > 14:40写道:
> > > > > >> > >> > > >> > >
> > > > > >> > >> > > >> > >> Hi Yuan and Lincoln,
> > > > > >> > >> > > >> > >> thanks a lot for the attention. I would answer
> > the
> > > > > email
> > > > > >> one
> > > > > >> > >> by
> > > > > >> > >> > > one.
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> To Yuan
> > > > > >> > >> > > >> > >> > How shall we deal with CDC data? If there is
> > CDC
> > > data
> > > > > >> in
> > > > > >> > the
> > > > > >> > >> > > >> pipeline,
> > > > > >> > >> > > >> > >> IMHO, shuffle by join key will cause CDC data
> > > disorder.
> > > > > >> Will
> > > > > >> > >> it
> > > > > >> > >> > be
> > > > > >> > >> > > >> > better
> > > > > >> > >> > > >> > >> to use primary key in this case?
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> Good question.
> > > > > >> > >> > > >> > >> The problem could not only exists in CDC data
> > > source,
> > > > > but
> > > > > >> > also
> > > > > >> > >> > > exists
> > > > > >> > >> > > >> > when
> > > > > >> > >> > > >> > >> the input stream is not insert-only stream (for
> > > > > example,
> > > > > >> the
> > > > > >> > >> > result
> > > > > >> > >> > > >> of
> > > > > >> > >> > > >> > >> unbounded aggregate or regular join).
> > > > > >> > >> > > >> > >> I think use hash by primary key is not a good
> > > choise.
> > > > > It
> > > > > >> > could
> > > > > >> > >> > not
> > > > > >> > >> > > >> raise
> > > > > >> > >> > > >> > >> the cache hit because cache key is look up key
> > > instead
> > > > > of
> > > > > >> > >> primary
> > > > > >> > >> > > >> key of
> > > > > >> > >> > > >> > >> input.
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> To avoid wrong result, hash lookup Join requires
> > > that
> > > > > the
> > > > > >> > >> input
> > > > > >> > >> > > >> stream
> > > > > >> > >> > > >> > >> should be insert_only stream or its upsert keys
> > > > > contains
> > > > > >> > >> lookup
> > > > > >> > >> > > keys.
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> I've added this limitation to FLIP, thanks a lot
> > > for
> > > > > >> > >> reminding.
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> > If the shuffle keys can be customized  when
> > users
> > > > > have
> > > > > >> the
> > > > > >> > >> > > >> knowledge
> > > > > >> > >> > > >> > >> about distribution of data?
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> I'm not sure I understand your question.
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> Do you mean to support user defined partitioner
> > > > > function
> > > > > >> on
> > > > > >> > >> keys
> > > > > >> > >> > > just
> > > > > >> > >> > > >> > like
> > > > > >> > >> > > >> > >> flink DataStream sql?
> > > > > >> > >> > > >> > >> If yes, I'm afraid there is no plan to support
> > this
> > > > > >> feature
> > > > > >> > >> yet
> > > > > >> > >> > > >> because
> > > > > >> > >> > > >> > >> the feature involves many things, for example:
> > > > > >> > >> > > >> > >> 1. sql syntax
> > > > > >> > >> > > >> > >> 2. user defined partitioner API
> > > > > >> > >> > > >> > >> 3. RelDistribution type extension and Flink
> > > > > >> RelDistribution
> > > > > >> > >> > > extension
> > > > > >> > >> > > >> > >> 4. FlinkExpandConversionRule
> > > > > >> > >> > > >> > >> 5. Exchange execNode extension
> > > > > >> > >> > > >> > >> 6. ....
> > > > > >> > >> > > >> > >> It needs well designed and more discussion. If
> > > this is
> > > > > a
> > > > > >> > >> strong
> > > > > >> > >> > > >> > >> requirement, we would drive another discussion on
> > > this
> > > > > >> point
> > > > > >> > >> > > >> > individually.
> > > > > >> > >> > > >> > >> In this FLIP, I would first support hash shuffle.
> > > WDYT?
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> Or do you mean support hash by other keys instead
> > > of
> > > > > >> lookup
> > > > > >> > >> key?
> > > > > >> > >> > > >> > >> If yes, would you please tell me a specific user
> > > case?
> > > > > >> > >> > > >> > >> We need to fetch the record from external storage
> > > of
> > > > > >> > dimension
> > > > > >> > >> > > table
> > > > > >> > >> > > >> by
> > > > > >> > >> > > >> > >> look up key, so those dimension table source uses
> > > look
> > > > > up
> > > > > >> > >> keys as
> > > > > >> > >> > > >> cache
> > > > > >> > >> > > >> > >> key.
> > > > > >> > >> > > >> > >> We could only increase  the cache ratio by
> > shuffle
> > > > > lookup
> > > > > >> > >> keys.
> > > > > >> > >> > > >> > >> I need more use cases to understand this
> > > requirement.
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> > Some connectors such as hive, caches all data
> > in
> > > > > >> > >> > LookupFunction.
> > > > > >> > >> > > >> How
> > > > > >> > >> > > >> > to
> > > > > >> > >> > > >> > >> decrease the valid cache data size if data can be
> > > > > >> shuffled?
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> Very good idea.
> > > > > >> > >> > > >> > >> There are two types of cache.
> > > > > >> > >> > > >> > >> For Key-Value storage, such as Redis/HBase, the
> > > lookup
> > > > > >> table
> > > > > >> > >> > source
> > > > > >> > >> > > >> > stores
> > > > > >> > >> > > >> > >> the visited lookup keys and it's record into
> > cache
> > > > > >> lazily.
> > > > > >> > >> > > >> > >> For other storage without keys, such as hive,
> > each
> > > task
> > > > > >> > loads
> > > > > >> > >> all
> > > > > >> > >> > > >> data
> > > > > >> > >> > > >> > >> into cache eagerly in the initialize phase.
> > > > > >> > >> > > >> > >> After introduce hash partitioner, for key-value
> > > > > storages,
> > > > > >> > >> there
> > > > > >> > >> > is
> > > > > >> > >> > > no
> > > > > >> > >> > > >> > need
> > > > > >> > >> > > >> > >> to change; for hive, each task could only load
> > > part of
> > > > > >> cache
> > > > > >> > >> > > instead
> > > > > >> > >> > > >> of
> > > > > >> > >> > > >> > >> load all cache.
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> We have implemented this optimization in our
> > > internal
> > > > > >> > version.
> > > > > >> > >> > > >> > >> The core idea is push the partitioner information
> > > down
> > > > > to
> > > > > >> > the
> > > > > >> > >> > > lookup
> > > > > >> > >> > > >> > table
> > > > > >> > >> > > >> > >> source. When loading data into caches, each task
> > > could
> > > > > >> only
> > > > > >> > >> store
> > > > > >> > >> > > >> those
> > > > > >> > >> > > >> > >> records which look keys are sent to current task.
> > > > > >> > >> > > >> > >> We called this 'HashPartitionedCache'.
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> I have added this point into the Lookup Join
> > > > > requirements
> > > > > >> > >> list in
> > > > > >> > >> > > the
> > > > > >> > >> > > >> > >> motivation of the FLIP, but I would not do this
> > > point
> > > > > in
> > > > > >> > this
> > > > > >> > >> > FLIP
> > > > > >> > >> > > >> right
> > > > > >> > >> > > >> > >> now.
> > > > > >> > >> > > >> > >> If this is a strong requirement, we need drive
> > > another
> > > > > >> > >> discussion
> > > > > >> > >> > > on
> > > > > >> > >> > > >> > this
> > > > > >> > >> > > >> > >> topic individually because this point involves
> > many
> > > > > >> > extension
> > > > > >> > >> on
> > > > > >> > >> > > API.
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> Best,
> > > > > >> > >> > > >> > >> Jing Zhang
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >> Lincoln Lee <lincoln.8...@gmail.com>
> > > 于2021年12月29日周三
> > > > > >> > 10:01写道:
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> > >>> Hi Jing,
> > > > > >> > >> > > >> > >>>     Thanks for bringing up this discussion!
> > Agree
> > > > > that
> > > > > >> > this
> > > > > >> > >> > join
> > > > > >> > >> > > >> hints
> > > > > >> > >> > > >> > >>> should benefit both bounded and unbounded cases
> > as
> > > > > >> Martin
> > > > > >> > >> > > mentioned.
> > > > > >> > >> > > >> > >>> I also agree that implementing the query hint is
> > > the
> > > > > >> right
> > > > > >> > >> way
> > > > > >> > >> > > for a
> > > > > >> > >> > > >> > more
> > > > > >> > >> > > >> > >>> general purpose since the dynamic table options
> > > has a
> > > > > >> > limited
> > > > > >> > >> > > scope.
> > > > > >> > >> > > >> > >>>    Some points I'd like to share are:
> > > > > >> > >> > > >> > >>> 1. Regarding the hint name ‘USE_HASH’, could we
> > > > > consider
> > > > > >> > more
> > > > > >> > >> > > >> > candidates?
> > > > > >> > >> > > >> > >>> Things are a little different from RDBMS in the
> > > > > >> distributed
> > > > > >> > >> > world,
> > > > > >> > >> > > >> and
> > > > > >> > >> > > >> > we
> > > > > >> > >> > > >> > >>> also aim to solve the data skew problem, so all
> > > these
> > > > > >> > >> incoming
> > > > > >> > >> > > hints
> > > > > >> > >> > > >> > names
> > > > > >> > >> > > >> > >>> should be considered together.
> > > > > >> > >> > > >> > >>> 2. As you mentioned in the flip, this solution
> > > depends
> > > > > >> on
> > > > > >> > >> future
> > > > > >> > >> > > >> > changes
> > > > > >> > >> > > >> > >>> to
> > > > > >> > >> > > >> > >>> calcite (and also upgrading calcite would be
> > > another
> > > > > >> > possible
> > > > > >> > >> > big
> > > > > >> > >> > > >> > change:
> > > > > >> > >> > > >> > >>> at least calicite-1.30 vs 1.26, are we preparing
> > > to
> > > > > >> accept
> > > > > >> > >> this
> > > > > >> > >> > > big
> > > > > >> > >> > > >> > >>> change?). Is there another possible way to
> > > minimize
> > > > > the
> > > > > >> > >> change
> > > > > >> > >> > in
> > > > > >> > >> > > >> > calcite?
> > > > > >> > >> > > >> > >>> As I know there're more limitations than
> > > `Correlate`.
> > > > > >> > >> > > >> > >>>
> > > > > >> > >> > > >> > >>> Best,
> > > > > >> > >> > > >> > >>> Lincoln Lee
> > > > > >> > >> > > >> > >>>
> > > > > >> > >> > > >> > >>>
> > > > > >> > >> > > >> > >>> Jing Zhang <beyond1...@gmail.com>
> > 于2021年12月28日周二
> > > > > >> 23:04写道:
> > > > > >> > >> > > >> > >>>
> > > > > >> > >> > > >> > >>> > Hi Martijn,
> > > > > >> > >> > > >> > >>> > Thanks a lot for your attention.
> > > > > >> > >> > > >> > >>> > I'm sorry I didn't explain the motivation
> > > clearly. I
> > > > > >> > would
> > > > > >> > >> > like
> > > > > >> > >> > > to
> > > > > >> > >> > > >> > >>> explain
> > > > > >> > >> > > >> > >>> > it in detail, and then give response on your
> > > > > >> questions.
> > > > > >> > >> > > >> > >>> > A lookup join is typically used to enrich a
> > > table
> > > > > with
> > > > > >> > data
> > > > > >> > >> > that
> > > > > >> > >> > > >> is
> > > > > >> > >> > > >> > >>> queried
> > > > > >> > >> > > >> > >>> > from an external system. Many Lookup table
> > > sources
> > > > > >> > >> introduce
> > > > > >> > >> > > >> cache in
> > > > > >> > >> > > >> > >>> order
> > > > > >> > >> > > >> > >>> > to reduce the RPC call, such as JDBC, CSV,
> > HBase
> > > > > >> > >> connectors.
> > > > > >> > >> > > >> > >>> > For those connectors, we could raise cache hit
> > > ratio
> > > > > >> by
> > > > > >> > >> > routing
> > > > > >> > >> > > >> the
> > > > > >> > >> > > >> > same
> > > > > >> > >> > > >> > >>> > lookup keys to the same task instance. This is
> > > the
> > > > > >> > purpose
> > > > > >> > >> of
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>>
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >>
> > > > > >> > >> > >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> > > > > >> > >> > > >> > >>> > .
> > > > > >> > >> > > >> > >>> > Other cases might benefit from Hash
> > > distribution,
> > > > > >> such as
> > > > > >> > >> > batch
> > > > > >> > >> > > >> hash
> > > > > >> > >> > > >> > >>> join
> > > > > >> > >> > > >> > >>> > as you mentioned. It is a cool idea, however
> > it
> > > is
> > > > > not
> > > > > >> > the
> > > > > >> > >> > > >> purpose of
> > > > > >> > >> > > >> > >>> this
> > > > > >> > >> > > >> > >>> > FLIP, we could discuss this in FLINK-20670
> > > > > >> > >> > > >> > >>> > <
> > > https://issues.apache.org/jira/browse/FLINK-20670
> > > > > >.
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>> > > - When I was reading about this topic [1] I
> > > was
> > > > > >> > >> wondering if
> > > > > >> > >> > > >> this
> > > > > >> > >> > > >> > >>> feature
> > > > > >> > >> > > >> > >>> > would be more beneficial for bounded use cases
> > > and
> > > > > >> not so
> > > > > >> > >> much
> > > > > >> > >> > > for
> > > > > >> > >> > > >> > >>> > unbounded use cases. What do you think?
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>> > As mentioned before, the purpose of Hash
> > Lookup
> > > Join
> > > > > >> is
> > > > > >> > to
> > > > > >> > >> > > >> increase
> > > > > >> > >> > > >> > the
> > > > > >> > >> > > >> > >>> > cache hit ratio which is different from Oracle
> > > Hash
> > > > > >> Join.
> > > > > >> > >> > > However
> > > > > >> > >> > > >> we
> > > > > >> > >> > > >> > >>> could
> > > > > >> > >> > > >> > >>> > use the similar hint syntax.
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>> > > - If I look at the current documentation for
> > > SQL
> > > > > >> Hints
> > > > > >> > in
> > > > > >> > >> > > Flink
> > > > > >> > >> > > >> > [2], I
> > > > > >> > >> > > >> > >>> > notice that all of the hints there are located
> > > at
> > > > > the
> > > > > >> end
> > > > > >> > >> of
> > > > > >> > >> > the
> > > > > >> > >> > > >> SQL
> > > > > >> > >> > > >> > >>> > statement. In the FLIP, the use_hash is
> > defined
> > > > > >> directly
> > > > > >> > >> after
> > > > > >> > >> > > the
> > > > > >> > >> > > >> > >>> 'SELECT'
> > > > > >> > >> > > >> > >>> > keyword. Can we somehow make this consistent
> > > for the
> > > > > >> > user?
> > > > > >> > >> Or
> > > > > >> > >> > > >> should
> > > > > >> > >> > > >> > the
> > > > > >> > >> > > >> > >>> > user be able to specify hints anywhere in its
> > > SQL
> > > > > >> > >> statement?
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>> > Calcite supports hints in two locations [3]:
> > > > > >> > >> > > >> > >>> > Query Hint: right after the SELECT keyword;
> > > > > >> > >> > > >> > >>> > Table Hint: right after the referenced table
> > > name.
> > > > > >> > >> > > >> > >>> > Now Flink has supported dynamic table options
> > > based
> > > > > on
> > > > > >> > the
> > > > > >> > >> > Hint
> > > > > >> > >> > > >> > >>> framework
> > > > > >> > >> > > >> > >>> > of Calcite which is mentioned in doc[2].
> > > > > >> > >> > > >> > >>> > Besides, query hints are also important, it
> > > could
> > > > > >> give a
> > > > > >> > >> hint
> > > > > >> > >> > > for
> > > > > >> > >> > > >> > >>> > optimizers to choose a better plan. Almost all
> > > > > popular
> > > > > >> > >> > databases
> > > > > >> > >> > > >> and
> > > > > >> > >> > > >> > >>> > big-data engines support sql query hints, such
> > > as
> > > > > >> oracle,
> > > > > >> > >> > hive,
> > > > > >> > >> > > >> spark
> > > > > >> > >> > > >> > >>> and
> > > > > >> > >> > > >> > >>> > so on.
> > > > > >> > >> > > >> > >>> > I think using query hints in this case is more
> > > > > natural
> > > > > >> > for
> > > > > >> > >> > > users,
> > > > > >> > >> > > >> > WDYT?
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>> > I have updated the motivation part in the
> > FLIP,
> > > > > >> > >> > > >> > >>> > Thanks for the feedback!
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>> > [1]
> > > > > >> > >> > > >>
> > > > > >> https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
> > > > > >> > >> > > >> > >>> > [2]
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>>
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >>
> > > > > >> > >> > >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > >
> > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
> > > > > >> > >> > > >> > >>> > [3]
> > > > > >> > >> https://calcite.apache.org/docs/reference.html#sql-hints
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>> > Best,
> > > > > >> > >> > > >> > >>> > Jing Zhang
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>> > Martijn Visser <mart...@ververica.com>
> > > > > 于2021年12月28日周二
> > > > > >> > >> > 22:02写道:
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>> > > Hi Jing,
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> > > Thanks a lot for the explanation and the
> > > FLIP. I
> > > > > >> > >> definitely
> > > > > >> > >> > > >> learned
> > > > > >> > >> > > >> > >>> > > something when reading more about
> > `use_hash`.
> > > My
> > > > > >> > >> > > interpretation
> > > > > >> > >> > > >> > would
> > > > > >> > >> > > >> > >>> be
> > > > > >> > >> > > >> > >>> > > that the primary benefit of a hash lookup
> > join
> > > > > >> would be
> > > > > >> > >> > > improved
> > > > > >> > >> > > >> > >>> > > performance by allowing the user to
> > explicitly
> > > > > >> optimise
> > > > > >> > >> the
> > > > > >> > >> > > >> > planner.
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> > > I have a couple of questions:
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> > > - When I was reading about this topic [1] I
> > > was
> > > > > >> > >> wondering if
> > > > > >> > >> > > >> this
> > > > > >> > >> > > >> > >>> feature
> > > > > >> > >> > > >> > >>> > > would be more beneficial for bounded use
> > > cases and
> > > > > >> not
> > > > > >> > so
> > > > > >> > >> > much
> > > > > >> > >> > > >> for
> > > > > >> > >> > > >> > >>> > > unbounded use cases. What do you think?
> > > > > >> > >> > > >> > >>> > > - If I look at the current documentation for
> > > SQL
> > > > > >> Hints
> > > > > >> > in
> > > > > >> > >> > > Flink
> > > > > >> > >> > > >> > [2], I
> > > > > >> > >> > > >> > >>> > > notice that all of the hints there are
> > > located at
> > > > > >> the
> > > > > >> > >> end of
> > > > > >> > >> > > the
> > > > > >> > >> > > >> > SQL
> > > > > >> > >> > > >> > >>> > > statement. In the FLIP, the use_hash is
> > > defined
> > > > > >> > directly
> > > > > >> > >> > after
> > > > > >> > >> > > >> the
> > > > > >> > >> > > >> > >>> > 'SELECT'
> > > > > >> > >> > > >> > >>> > > keyword. Can we somehow make this consistent
> > > for
> > > > > the
> > > > > >> > >> user?
> > > > > >> > >> > Or
> > > > > >> > >> > > >> > should
> > > > > >> > >> > > >> > >>> the
> > > > > >> > >> > > >> > >>> > > user be able to specify hints anywhere in
> > its
> > > SQL
> > > > > >> > >> statement?
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> > > Best regards,
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> > > Martijn
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> > > [1]
> > > > > >> > >> > > >> >
> > > > > >> >
> > https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
> > > > > >> > >> > > >> > >>> > > [2]
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>>
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >>
> > > > > >> > >> > >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > >
> > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> > > On Tue, 28 Dec 2021 at 08:17, Jing Zhang <
> > > > > >> > >> > > beyond1...@gmail.com>
> > > > > >> > >> > > >> > >>> wrote:
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> > > > Hi everyone,
> > > > > >> > >> > > >> > >>> > > > Look up join
> > > > > >> > >> > > >> > >>> > > > <
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>>
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >>
> > > > > >> > >> > >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > >
> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > > > > >> > >> > > >> > >>> > > > >[1]
> > > > > >> > >> > > >> > >>> > > > is
> > > > > >> > >> > > >> > >>> > > > commonly used feature in Flink SQL. We
> > have
> > > > > >> received
> > > > > >> > >> many
> > > > > >> > >> > > >> > >>> optimization
> > > > > >> > >> > > >> > >>> > > > requirements on look up join. For example:
> > > > > >> > >> > > >> > >>> > > > 1. Enforces left side of lookup join do a
> > > hash
> > > > > >> > >> partitioner
> > > > > >> > >> > > to
> > > > > >> > >> > > >> > raise
> > > > > >> > >> > > >> > >>> > cache
> > > > > >> > >> > > >> > >>> > > > hint ratio
> > > > > >> > >> > > >> > >>> > > > 2. Solves the data skew problem after
> > > introduces
> > > > > >> hash
> > > > > >> > >> > lookup
> > > > > >> > >> > > >> join
> > > > > >> > >> > > >> > >>> > > > 3. Enables mini-batch optimization to
> > > reduce RPC
> > > > > >> call
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > > > Next we will solve these problems one by
> > > one.
> > > > > >> > >> Firstly,  we
> > > > > >> > >> > > >> would
> > > > > >> > >> > > >> > >>> focus
> > > > > >> > >> > > >> > >>> > on
> > > > > >> > >> > > >> > >>> > > > point 1, and continue to discuss point 2
> > and
> > > > > >> point 3
> > > > > >> > >> > later.
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > > > There are many similar requirements from
> > > user
> > > > > mail
> > > > > >> > list
> > > > > >> > >> > and
> > > > > >> > >> > > >> JIRA
> > > > > >> > >> > > >> > >>> about
> > > > > >> > >> > > >> > >>> > > hash
> > > > > >> > >> > > >> > >>> > > > Lookup Join, for example:
> > > > > >> > >> > > >> > >>> > > > 1. FLINK-23687 <
> > > > > >> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-23687>
> > > > > >> > >> > > >> > >>> -
> > > > > >> > >> > > >> > >>> > > > Introduce partitioned lookup join to
> > enforce
> > > > > >> input of
> > > > > >> > >> > > >> LookupJoin
> > > > > >> > >> > > >> > to
> > > > > >> > >> > > >> > >>> > hash
> > > > > >> > >> > > >> > >>> > > > shuffle by lookup keys
> > > > > >> > >> > > >> > >>> > > > 2. FLINK-25396 <
> > > > > >> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-25396>
> > > > > >> > >> > > >> > >>> -
> > > > > >> > >> > > >> > >>> > > > lookupjoin source table for
> > pre-partitioning
> > > > > >> > >> > > >> > >>> > > > 3. FLINK-25262 <
> > > > > >> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-25262>
> > > > > >> > >> > > >> > >>> -
> > > > > >> > >> > > >> > >>> > > > Support to send data to lookup table for
> > > > > >> > >> > > >> > KeyGroupStreamPartitioner
> > > > > >> > >> > > >> > >>> way
> > > > > >> > >> > > >> > >>> > > for
> > > > > >> > >> > > >> > >>> > > > SQL.
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > > > In this FLIP, I would like to start a
> > > discussion
> > > > > >> > about
> > > > > >> > >> > Hash
> > > > > >> > >> > > >> > Lookup
> > > > > >> > >> > > >> > >>> > Join.
> > > > > >> > >> > > >> > >>> > > > The core idea is introducing a 'USE_HASH'
> > > hint
> > > > > in
> > > > > >> > >> query.
> > > > > >> > >> > > This
> > > > > >> > >> > > >> > >>> syntax
> > > > > >> > >> > > >> > >>> > is
> > > > > >> > >> > > >> > >>> > > > directly user-oriented and therefore
> > > requires
> > > > > >> careful
> > > > > >> > >> > > design.
> > > > > >> > >> > > >> > >>> > > > There are two ways about how to propagate
> > > this
> > > > > >> hint
> > > > > >> > to
> > > > > >> > >> > > >> > LookupJoin in
> > > > > >> > >> > > >> > >>> > > > optimizer. We need further discussion to
> > do
> > > > > final
> > > > > >> > >> decide.
> > > > > >> > >> > > >> Anyway,
> > > > > >> > >> > > >> > >>> the
> > > > > >> > >> > > >> > >>> > > > difference between the two solution is
> > only
> > > > > about
> > > > > >> the
> > > > > >> > >> > > internal
> > > > > >> > >> > > >> > >>> > > > implementation and has no impact on the
> > > user.
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > > > For more detail on the proposal:
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>>
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >>
> > > > > >> > >> > >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > > > Looking forward to your feedback, thanks.
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > > > Best,
> > > > > >> > >> > > >> > >>> > > > Jing Zhang
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > > > [1]
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>>
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >>
> > > > > >> > >> > >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > >
> > >
> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > > > > >> > >> > > >> > >>> > > >
> > > > > >> > >> > > >> > >>> > >
> > > > > >> > >> > > >> > >>> >
> > > > > >> > >> > > >> > >>>
> > > > > >> > >> > > >> > >>
> > > > > >> > >> > > >> >
> > > > > >> > >> > > >>
> > > > > >> > >> > > >
> > > > > >> > >> > >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > >
> > >
> > >
> >

Reply via email to