Sorry for typo.
so it would effect existed sql behavior. => it would not effect existed sql
behavior.


Jing Zhang <beyond1...@gmail.com> 于2021年12月22日周三 12:36写道:

> Hi Jinpeng,
> Thanks for response.
> I guess we say different solution to solve data skew, please correct me if
> I am wrong.
>
> What you say is for cases hot keys could be known in advance and stored in
> metadata. So we could query whether there is data skew and skew values
> from  RelMetadataQuery. The optimization is used in Hive.
>
> What I say is an alternative approach which we could not list hot keys in
> advance. Just like the hot words of search engines, we know that this
> phenomenon exists but there is no way to list hot words in advance.
> To avoid data skew for normal hash distribution, we introduce a special
> hash distribution. It would send records with the same hash key to one of N
> downstream instead of the same downstream.
>
> I add a new distribution type, but only trigger this distribution type
> under specified conditions, so it would effect existed sql behavior.
> For example, user could enable this optimization by specify the hint, like
> the following.
> > select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day
> > from default_catalog.default_database.probe as p
> > join partition_table_3 /*+ PARTITIONED_JOIN('BUCKET_NUM'= '16')*/ for
> system_time as of p.proc_time as b
> > on p.x=b.x and p.y=b.y
>
> WDYT?
>
> Best,
> Jing Zhang
>
> Jinpeng Wu <wjpabc...@gmail.com> 于2021年12月22日周三 11:45写道:
>
>> Hi, Jing.  I still don't get your point of adding new distribution types.
>>
>> I think what you need is a new metadata indicating whether there are
>> skewed
>> values. By looking it up through RelMetadataQuery, you may boost (or even
>> make it INF) the cost of two-pass agg or shuffled join and make other
>> implementations win.
>>
>> Adding new shuffle types is not as simple as it appears to be. You may
>> need
>> to adjust the traits passthrough and derivation logica. And compared with
>> metadata query, you may need to maintain the skew values during operators'
>> implementation rules, which introduces difficulties for further
>> maintenance.
>>
>> Thanks
>>
>> On Wed, Dec 22, 2021 at 11:05 AM Jing Zhang <beyond1...@gmail.com> wrote:
>>
>> > Hi Julian,
>> > Make sense.
>> > Then a new newAdded RelDistribution type requires a strong reason.
>> > I have created a JIRA [1] to track this requirement.
>> >
>> > [1] https://issues.apache.org/jira/browse/CALCITE-4957
>> >
>> > Best,
>> > Jing Zhang
>> >
>> > Julian Hyde <jhyde.apa...@gmail.com> 于2021年12月22日周三 08:04写道:
>> >
>> > > I think you should contribute a change that adds a new value to the
>> enum.
>> > > I know that enums are not easily extensible, but in cases like this,
>> that
>> > > can be a feature rather than a bug.
>> > >
>> > > There are not very many distribution types, and new distribution types
>> > are
>> > > rarely invented. Requiring people to contribute to the enum is a
>> useful
>> > > forcing function: the various groups who use Calcite are forced to
>> > describe
>> > > their use cases, and when people discover that they have the same use
>> > > cases, we tend to get reusable code.
>> > >
>> > > Converting an enum to an interface makes things a lot less concrete.
>> It
>> > is
>> > > more difficult to reason about a piece of code, and there are bugs
>> > because
>> > > you can’t review a ’switch’ expression and say ‘yes, that covers all
>> > cases’.
>> > >
>> > > Julian
>> > >
>> > > > On Dec 21, 2021, at 12:33 AM, Jing Zhang <beyond1...@gmail.com>
>> wrote:
>> > > >
>> > > > Hi community,
>> > > > I hope to extend `RelDistribution` to support more distribution
>> types
>> > in
>> > > > order to solve data skew in the normal hash distribution.
>> > > >
>> > > > When we use hash distribution to bring all records with the same
>> hash
>> > key
>> > > > to the same place, the job performance would be poor if there exists
>> > hot
>> > > > keys.
>> > > > There is a solution to solve this problem, we could send a hot key
>> to
>> > one
>> > > > of serval downstream tasks, chosen at random.
>> > > > In HashJoin, we could use random hash partition in one side, for the
>> > > other
>> > > > input to the join, records relating to the hot key need to be
>> > replicated
>> > > to
>> > > > all downstream tasks handling that key.
>> > > > In HashAggregate, we could split the aggregate into partial-final if
>> > all
>> > > > the aggregation functions support splitting.
>> > > > The 10th chapter in the book "Designing Data Intensive Applications"
>> > also
>> > > > refers this solution to solve data skew.
>> > > >
>> > > > Anyway, we should first extend `RelDistribution` to support more
>> > > > distribution types, for example, hash random type.
>> > > > However, `RelDistribution.Type` is enum class which is not
>> extensible.
>> > > > I would not add the new types in enum `RelDistribution.Type`
>> directly.
>> > > > I prefer to do a refactor on `RelDistribution.Type` to make it
>> > extensible
>> > > > and add the new types in the subclass in the external execution
>> engine
>> > > (e.g
>> > > > Flink).
>> > > >
>> > > > For example, there is a lookup join in Flink. is typically used to
>> > > enrich a
>> > > > table with data that is queried from an external system.
>> > > > For the following query
>> > > >> select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day
>> > > >> from default_catalog.default_database.probe as p
>> > > >> join partition_table_3 for system_time as of p.proc_time as b
>> > > >> on p.x=b.x and p.y=b.y
>> > > >
>> > > > When use normal hash distribution.
>> > > > The logical plan is as following,
>> > > >   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x,
>> y,
>> > > x0,
>> > > > y0, z, pt_year, pt_mon, pt_day])
>> > > >      +- Exchange(distribution=[hash[x, y]])
>> > > >         +- TableSourceScan(table=[[default_catalog,
>> default_database,
>> > > > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
>> > > >
>> > > > If enable data_skew solution in hint, the logical plan is as
>> following,
>> > > >   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x,
>> y,
>> > > x0,
>> > > > y0, z, pt_year, pt_mon, pt_day])
>> > > >      +- Exchange(distribution=[hash_random(key=[x, y],
>> bucket_num=8)])
>> > > >         +- TableSourceScan(table=[[default_catalog,
>> default_database,
>> > > > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
>> > > >
>> > > > What do you think?
>> > > >
>> > > > [1]
>> > > >
>> > >
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
>> > > >
>> > > > Best,
>> > > > Jing Zhang
>> > >
>> > >
>> >
>>
>

Reply via email to