Hi Jinpeng,
Thanks for response.
I guess we say different solution to solve data skew, please correct me if
I am wrong.

What you say is for cases hot keys could be known in advance and stored in
metadata. So we could query whether there is data skew and skew values
from  RelMetadataQuery. The optimization is used in Hive.

What I say is an alternative approach which we could not list hot keys in
advance. Just like the hot words of search engines, we know that this
phenomenon exists but there is no way to list hot words in advance.
To avoid data skew for normal hash distribution, we introduce a special
hash distribution. It would send records with the same hash key to one of N
downstream instead of the same downstream.

I add a new distribution type, but only trigger this distribution type
under specified conditions, so it would effect existed sql behavior.
For example, user could enable this optimization by specify the hint, like
the following.
> select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day
> from default_catalog.default_database.probe as p
> join partition_table_3 /*+ PARTITIONED_JOIN('BUCKET_NUM'= '16')*/ for
system_time as of p.proc_time as b
> on p.x=b.x and p.y=b.y

WDYT?

Best,
Jing Zhang

Jinpeng Wu <wjpabc...@gmail.com> 于2021年12月22日周三 11:45写道:

> Hi, Jing.  I still don't get your point of adding new distribution types.
>
> I think what you need is a new metadata indicating whether there are skewed
> values. By looking it up through RelMetadataQuery, you may boost (or even
> make it INF) the cost of two-pass agg or shuffled join and make other
> implementations win.
>
> Adding new shuffle types is not as simple as it appears to be. You may need
> to adjust the traits passthrough and derivation logica. And compared with
> metadata query, you may need to maintain the skew values during operators'
> implementation rules, which introduces difficulties for further
> maintenance.
>
> Thanks
>
> On Wed, Dec 22, 2021 at 11:05 AM Jing Zhang <beyond1...@gmail.com> wrote:
>
> > Hi Julian,
> > Make sense.
> > Then a new newAdded RelDistribution type requires a strong reason.
> > I have created a JIRA [1] to track this requirement.
> >
> > [1] https://issues.apache.org/jira/browse/CALCITE-4957
> >
> > Best,
> > Jing Zhang
> >
> > Julian Hyde <jhyde.apa...@gmail.com> 于2021年12月22日周三 08:04写道:
> >
> > > I think you should contribute a change that adds a new value to the
> enum.
> > > I know that enums are not easily extensible, but in cases like this,
> that
> > > can be a feature rather than a bug.
> > >
> > > There are not very many distribution types, and new distribution types
> > are
> > > rarely invented. Requiring people to contribute to the enum is a useful
> > > forcing function: the various groups who use Calcite are forced to
> > describe
> > > their use cases, and when people discover that they have the same use
> > > cases, we tend to get reusable code.
> > >
> > > Converting an enum to an interface makes things a lot less concrete. It
> > is
> > > more difficult to reason about a piece of code, and there are bugs
> > because
> > > you can’t review a ’switch’ expression and say ‘yes, that covers all
> > cases’.
> > >
> > > Julian
> > >
> > > > On Dec 21, 2021, at 12:33 AM, Jing Zhang <beyond1...@gmail.com>
> wrote:
> > > >
> > > > Hi community,
> > > > I hope to extend `RelDistribution` to support more distribution types
> > in
> > > > order to solve data skew in the normal hash distribution.
> > > >
> > > > When we use hash distribution to bring all records with the same hash
> > key
> > > > to the same place, the job performance would be poor if there exists
> > hot
> > > > keys.
> > > > There is a solution to solve this problem, we could send a hot key to
> > one
> > > > of serval downstream tasks, chosen at random.
> > > > In HashJoin, we could use random hash partition in one side, for the
> > > other
> > > > input to the join, records relating to the hot key need to be
> > replicated
> > > to
> > > > all downstream tasks handling that key.
> > > > In HashAggregate, we could split the aggregate into partial-final if
> > all
> > > > the aggregation functions support splitting.
> > > > The 10th chapter in the book "Designing Data Intensive Applications"
> > also
> > > > refers this solution to solve data skew.
> > > >
> > > > Anyway, we should first extend `RelDistribution` to support more
> > > > distribution types, for example, hash random type.
> > > > However, `RelDistribution.Type` is enum class which is not
> extensible.
> > > > I would not add the new types in enum `RelDistribution.Type`
> directly.
> > > > I prefer to do a refactor on `RelDistribution.Type` to make it
> > extensible
> > > > and add the new types in the subclass in the external execution
> engine
> > > (e.g
> > > > Flink).
> > > >
> > > > For example, there is a lookup join in Flink. is typically used to
> > > enrich a
> > > > table with data that is queried from an external system.
> > > > For the following query
> > > >> select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day
> > > >> from default_catalog.default_database.probe as p
> > > >> join partition_table_3 for system_time as of p.proc_time as b
> > > >> on p.x=b.x and p.y=b.y
> > > >
> > > > When use normal hash distribution.
> > > > The logical plan is as following,
> > > >   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x,
> y,
> > > x0,
> > > > y0, z, pt_year, pt_mon, pt_day])
> > > >      +- Exchange(distribution=[hash[x, y]])
> > > >         +- TableSourceScan(table=[[default_catalog, default_database,
> > > > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
> > > >
> > > > If enable data_skew solution in hint, the logical plan is as
> following,
> > > >   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x,
> y,
> > > x0,
> > > > y0, z, pt_year, pt_mon, pt_day])
> > > >      +- Exchange(distribution=[hash_random(key=[x, y],
> bucket_num=8)])
> > > >         +- TableSourceScan(table=[[default_catalog, default_database,
> > > > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
> > > >
> > > > What do you think?
> > > >
> > > > [1]
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > > >
> > > > Best,
> > > > Jing Zhang
> > >
> > >
> >
>

Reply via email to