Hi Jinpeng,
Thanks a lot for your response.
> If you don't handle traits derivations
correctly, your new queries may not perform as well as those old queries
using classic distribution types.

Exactly, Thanks for reminding me. I would handle trait derivations
carefully for the new added distribution type.

> Metadata queries can not only return concrete values, but also properties.
Like what areColumnUnique does, you may just use an "skewInfo" metadata
which returns a boolean value (or together with some other information if
available).

I'm not doubt that MetadataQuery could provide this feature. However, I
prefer to introduce a new distribution type because it is more conceptually
reasonable.
For hash-random distribution, it is a shuffle type which is different
from the normal hash distribution.
If we reuse hash distribution type instead of introducing new distribution
type, something may be wrong when handling traits derivations.
 A hash-random distribution could not satisfy normal hash distribution; a
hash distribution could not satisfy hash-random distribution type, either.
We need to take care of this when applying optimization to remove redundant
shuffles.

WDYT?

Best,
Jing Zhang


Jinpeng Wu <wjpabc...@gmail.com> 于2021年12月22日周三 13:04写道:

> Hi, Jing.
>
> I'm not worrying about existing queries being affected. But I am just
> providing some suggestions. If you don't handle traits derivations
> correctly, your new queries may not perform as well as those old queries
> using classic distribution types.
>
> Metadata queries can not only return concrete values, but also properties.
> Like what areColumnUnique does, you may just use an "skewInfo" metadata
> which returns a boolean value (or together with some other information if
> available).
>
> Regards,
> JInpeng Wu
>
> On Wed, Dec 22, 2021 at 12:41 PM Jing Zhang <beyond1...@gmail.com> wrote:
>
> > Sorry for typo.
> > so it would effect existed sql behavior. => it would not effect existed
> sql
> > behavior.
> >
> >
> > Jing Zhang <beyond1...@gmail.com> 于2021年12月22日周三 12:36写道:
> >
> > > Hi Jinpeng,
> > > Thanks for response.
> > > I guess we say different solution to solve data skew, please correct me
> > if
> > > I am wrong.
> > >
> > > What you say is for cases hot keys could be known in advance and stored
> > in
> > > metadata. So we could query whether there is data skew and skew values
> > > from  RelMetadataQuery. The optimization is used in Hive.
> > >
> > > What I say is an alternative approach which we could not list hot keys
> in
> > > advance. Just like the hot words of search engines, we know that this
> > > phenomenon exists but there is no way to list hot words in advance.
> > > To avoid data skew for normal hash distribution, we introduce a special
> > > hash distribution. It would send records with the same hash key to one
> > of N
> > > downstream instead of the same downstream.
> > >
> > > I add a new distribution type, but only trigger this distribution type
> > > under specified conditions, so it would effect existed sql behavior.
> > > For example, user could enable this optimization by specify the hint,
> > like
> > > the following.
> > > > select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day
> > > > from default_catalog.default_database.probe as p
> > > > join partition_table_3 /*+ PARTITIONED_JOIN('BUCKET_NUM'= '16')*/ for
> > > system_time as of p.proc_time as b
> > > > on p.x=b.x and p.y=b.y
> > >
> > > WDYT?
> > >
> > > Best,
> > > Jing Zhang
> > >
> > > Jinpeng Wu <wjpabc...@gmail.com> 于2021年12月22日周三 11:45写道:
> > >
> > >> Hi, Jing.  I still don't get your point of adding new distribution
> > types.
> > >>
> > >> I think what you need is a new metadata indicating whether there are
> > >> skewed
> > >> values. By looking it up through RelMetadataQuery, you may boost (or
> > even
> > >> make it INF) the cost of two-pass agg or shuffled join and make other
> > >> implementations win.
> > >>
> > >> Adding new shuffle types is not as simple as it appears to be. You may
> > >> need
> > >> to adjust the traits passthrough and derivation logica. And compared
> > with
> > >> metadata query, you may need to maintain the skew values during
> > operators'
> > >> implementation rules, which introduces difficulties for further
> > >> maintenance.
> > >>
> > >> Thanks
> > >>
> > >> On Wed, Dec 22, 2021 at 11:05 AM Jing Zhang <beyond1...@gmail.com>
> > wrote:
> > >>
> > >> > Hi Julian,
> > >> > Make sense.
> > >> > Then a new newAdded RelDistribution type requires a strong reason.
> > >> > I have created a JIRA [1] to track this requirement.
> > >> >
> > >> > [1] https://issues.apache.org/jira/browse/CALCITE-4957
> > >> >
> > >> > Best,
> > >> > Jing Zhang
> > >> >
> > >> > Julian Hyde <jhyde.apa...@gmail.com> 于2021年12月22日周三 08:04写道:
> > >> >
> > >> > > I think you should contribute a change that adds a new value to
> the
> > >> enum.
> > >> > > I know that enums are not easily extensible, but in cases like
> this,
> > >> that
> > >> > > can be a feature rather than a bug.
> > >> > >
> > >> > > There are not very many distribution types, and new distribution
> > types
> > >> > are
> > >> > > rarely invented. Requiring people to contribute to the enum is a
> > >> useful
> > >> > > forcing function: the various groups who use Calcite are forced to
> > >> > describe
> > >> > > their use cases, and when people discover that they have the same
> > use
> > >> > > cases, we tend to get reusable code.
> > >> > >
> > >> > > Converting an enum to an interface makes things a lot less
> concrete.
> > >> It
> > >> > is
> > >> > > more difficult to reason about a piece of code, and there are bugs
> > >> > because
> > >> > > you can’t review a ’switch’ expression and say ‘yes, that covers
> all
> > >> > cases’.
> > >> > >
> > >> > > Julian
> > >> > >
> > >> > > > On Dec 21, 2021, at 12:33 AM, Jing Zhang <beyond1...@gmail.com>
> > >> wrote:
> > >> > > >
> > >> > > > Hi community,
> > >> > > > I hope to extend `RelDistribution` to support more distribution
> > >> types
> > >> > in
> > >> > > > order to solve data skew in the normal hash distribution.
> > >> > > >
> > >> > > > When we use hash distribution to bring all records with the same
> > >> hash
> > >> > key
> > >> > > > to the same place, the job performance would be poor if there
> > exists
> > >> > hot
> > >> > > > keys.
> > >> > > > There is a solution to solve this problem, we could send a hot
> key
> > >> to
> > >> > one
> > >> > > > of serval downstream tasks, chosen at random.
> > >> > > > In HashJoin, we could use random hash partition in one side, for
> > the
> > >> > > other
> > >> > > > input to the join, records relating to the hot key need to be
> > >> > replicated
> > >> > > to
> > >> > > > all downstream tasks handling that key.
> > >> > > > In HashAggregate, we could split the aggregate into
> partial-final
> > if
> > >> > all
> > >> > > > the aggregation functions support splitting.
> > >> > > > The 10th chapter in the book "Designing Data Intensive
> > Applications"
> > >> > also
> > >> > > > refers this solution to solve data skew.
> > >> > > >
> > >> > > > Anyway, we should first extend `RelDistribution` to support more
> > >> > > > distribution types, for example, hash random type.
> > >> > > > However, `RelDistribution.Type` is enum class which is not
> > >> extensible.
> > >> > > > I would not add the new types in enum `RelDistribution.Type`
> > >> directly.
> > >> > > > I prefer to do a refactor on `RelDistribution.Type` to make it
> > >> > extensible
> > >> > > > and add the new types in the subclass in the external execution
> > >> engine
> > >> > > (e.g
> > >> > > > Flink).
> > >> > > >
> > >> > > > For example, there is a lookup join in Flink. is typically used
> to
> > >> > > enrich a
> > >> > > > table with data that is queried from an external system.
> > >> > > > For the following query
> > >> > > >> select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day
> > >> > > >> from default_catalog.default_database.probe as p
> > >> > > >> join partition_table_3 for system_time as of p.proc_time as b
> > >> > > >> on p.x=b.x and p.y=b.y
> > >> > > >
> > >> > > > When use normal hash distribution.
> > >> > > > The logical plan is as following,
> > >> > > >   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y],
> > select=[x,
> > >> y,
> > >> > > x0,
> > >> > > > y0, z, pt_year, pt_mon, pt_day])
> > >> > > >      +- Exchange(distribution=[hash[x, y]])
> > >> > > >         +- TableSourceScan(table=[[default_catalog,
> > >> default_database,
> > >> > > > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
> > >> > > >
> > >> > > > If enable data_skew solution in hint, the logical plan is as
> > >> following,
> > >> > > >   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y],
> > select=[x,
> > >> y,
> > >> > > x0,
> > >> > > > y0, z, pt_year, pt_mon, pt_day])
> > >> > > >      +- Exchange(distribution=[hash_random(key=[x, y],
> > >> bucket_num=8)])
> > >> > > >         +- TableSourceScan(table=[[default_catalog,
> > >> default_database,
> > >> > > > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
> > >> > > >
> > >> > > > What do you think?
> > >> > > >
> > >> > > > [1]
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > >> > > >
> > >> > > > Best,
> > >> > > > Jing Zhang
> > >> > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to