Hi Jinpeng, Thanks a lot for your response. > If you don't handle traits derivations correctly, your new queries may not perform as well as those old queries using classic distribution types.
Exactly, Thanks for reminding me. I would handle trait derivations carefully for the new added distribution type. > Metadata queries can not only return concrete values, but also properties. Like what areColumnUnique does, you may just use an "skewInfo" metadata which returns a boolean value (or together with some other information if available). I'm not doubt that MetadataQuery could provide this feature. However, I prefer to introduce a new distribution type because it is more conceptually reasonable. For hash-random distribution, it is a shuffle type which is different from the normal hash distribution. If we reuse hash distribution type instead of introducing new distribution type, something may be wrong when handling traits derivations. A hash-random distribution could not satisfy normal hash distribution; a hash distribution could not satisfy hash-random distribution type, either. We need to take care of this when applying optimization to remove redundant shuffles. WDYT? Best, Jing Zhang Jinpeng Wu <wjpabc...@gmail.com> 于2021年12月22日周三 13:04写道: > Hi, Jing. > > I'm not worrying about existing queries being affected. But I am just > providing some suggestions. If you don't handle traits derivations > correctly, your new queries may not perform as well as those old queries > using classic distribution types. > > Metadata queries can not only return concrete values, but also properties. > Like what areColumnUnique does, you may just use an "skewInfo" metadata > which returns a boolean value (or together with some other information if > available). > > Regards, > JInpeng Wu > > On Wed, Dec 22, 2021 at 12:41 PM Jing Zhang <beyond1...@gmail.com> wrote: > > > Sorry for typo. > > so it would effect existed sql behavior. => it would not effect existed > sql > > behavior. > > > > > > Jing Zhang <beyond1...@gmail.com> 于2021年12月22日周三 12:36写道: > > > > > Hi Jinpeng, > > > Thanks for response. > > > I guess we say different solution to solve data skew, please correct me > > if > > > I am wrong. > > > > > > What you say is for cases hot keys could be known in advance and stored > > in > > > metadata. So we could query whether there is data skew and skew values > > > from RelMetadataQuery. The optimization is used in Hive. > > > > > > What I say is an alternative approach which we could not list hot keys > in > > > advance. Just like the hot words of search engines, we know that this > > > phenomenon exists but there is no way to list hot words in advance. > > > To avoid data skew for normal hash distribution, we introduce a special > > > hash distribution. It would send records with the same hash key to one > > of N > > > downstream instead of the same downstream. > > > > > > I add a new distribution type, but only trigger this distribution type > > > under specified conditions, so it would effect existed sql behavior. > > > For example, user could enable this optimization by specify the hint, > > like > > > the following. > > > > select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day > > > > from default_catalog.default_database.probe as p > > > > join partition_table_3 /*+ PARTITIONED_JOIN('BUCKET_NUM'= '16')*/ for > > > system_time as of p.proc_time as b > > > > on p.x=b.x and p.y=b.y > > > > > > WDYT? > > > > > > Best, > > > Jing Zhang > > > > > > Jinpeng Wu <wjpabc...@gmail.com> 于2021年12月22日周三 11:45写道: > > > > > >> Hi, Jing. I still don't get your point of adding new distribution > > types. > > >> > > >> I think what you need is a new metadata indicating whether there are > > >> skewed > > >> values. By looking it up through RelMetadataQuery, you may boost (or > > even > > >> make it INF) the cost of two-pass agg or shuffled join and make other > > >> implementations win. > > >> > > >> Adding new shuffle types is not as simple as it appears to be. You may > > >> need > > >> to adjust the traits passthrough and derivation logica. And compared > > with > > >> metadata query, you may need to maintain the skew values during > > operators' > > >> implementation rules, which introduces difficulties for further > > >> maintenance. > > >> > > >> Thanks > > >> > > >> On Wed, Dec 22, 2021 at 11:05 AM Jing Zhang <beyond1...@gmail.com> > > wrote: > > >> > > >> > Hi Julian, > > >> > Make sense. > > >> > Then a new newAdded RelDistribution type requires a strong reason. > > >> > I have created a JIRA [1] to track this requirement. > > >> > > > >> > [1] https://issues.apache.org/jira/browse/CALCITE-4957 > > >> > > > >> > Best, > > >> > Jing Zhang > > >> > > > >> > Julian Hyde <jhyde.apa...@gmail.com> 于2021年12月22日周三 08:04写道: > > >> > > > >> > > I think you should contribute a change that adds a new value to > the > > >> enum. > > >> > > I know that enums are not easily extensible, but in cases like > this, > > >> that > > >> > > can be a feature rather than a bug. > > >> > > > > >> > > There are not very many distribution types, and new distribution > > types > > >> > are > > >> > > rarely invented. Requiring people to contribute to the enum is a > > >> useful > > >> > > forcing function: the various groups who use Calcite are forced to > > >> > describe > > >> > > their use cases, and when people discover that they have the same > > use > > >> > > cases, we tend to get reusable code. > > >> > > > > >> > > Converting an enum to an interface makes things a lot less > concrete. > > >> It > > >> > is > > >> > > more difficult to reason about a piece of code, and there are bugs > > >> > because > > >> > > you can’t review a ’switch’ expression and say ‘yes, that covers > all > > >> > cases’. > > >> > > > > >> > > Julian > > >> > > > > >> > > > On Dec 21, 2021, at 12:33 AM, Jing Zhang <beyond1...@gmail.com> > > >> wrote: > > >> > > > > > >> > > > Hi community, > > >> > > > I hope to extend `RelDistribution` to support more distribution > > >> types > > >> > in > > >> > > > order to solve data skew in the normal hash distribution. > > >> > > > > > >> > > > When we use hash distribution to bring all records with the same > > >> hash > > >> > key > > >> > > > to the same place, the job performance would be poor if there > > exists > > >> > hot > > >> > > > keys. > > >> > > > There is a solution to solve this problem, we could send a hot > key > > >> to > > >> > one > > >> > > > of serval downstream tasks, chosen at random. > > >> > > > In HashJoin, we could use random hash partition in one side, for > > the > > >> > > other > > >> > > > input to the join, records relating to the hot key need to be > > >> > replicated > > >> > > to > > >> > > > all downstream tasks handling that key. > > >> > > > In HashAggregate, we could split the aggregate into > partial-final > > if > > >> > all > > >> > > > the aggregation functions support splitting. > > >> > > > The 10th chapter in the book "Designing Data Intensive > > Applications" > > >> > also > > >> > > > refers this solution to solve data skew. > > >> > > > > > >> > > > Anyway, we should first extend `RelDistribution` to support more > > >> > > > distribution types, for example, hash random type. > > >> > > > However, `RelDistribution.Type` is enum class which is not > > >> extensible. > > >> > > > I would not add the new types in enum `RelDistribution.Type` > > >> directly. > > >> > > > I prefer to do a refactor on `RelDistribution.Type` to make it > > >> > extensible > > >> > > > and add the new types in the subclass in the external execution > > >> engine > > >> > > (e.g > > >> > > > Flink). > > >> > > > > > >> > > > For example, there is a lookup join in Flink. is typically used > to > > >> > > enrich a > > >> > > > table with data that is queried from an external system. > > >> > > > For the following query > > >> > > >> select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day > > >> > > >> from default_catalog.default_database.probe as p > > >> > > >> join partition_table_3 for system_time as of p.proc_time as b > > >> > > >> on p.x=b.x and p.y=b.y > > >> > > > > > >> > > > When use normal hash distribution. > > >> > > > The logical plan is as following, > > >> > > > +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], > > select=[x, > > >> y, > > >> > > x0, > > >> > > > y0, z, pt_year, pt_mon, pt_day]) > > >> > > > +- Exchange(distribution=[hash[x, y]]) > > >> > > > +- TableSourceScan(table=[[default_catalog, > > >> default_database, > > >> > > > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y]) > > >> > > > > > >> > > > If enable data_skew solution in hint, the logical plan is as > > >> following, > > >> > > > +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], > > select=[x, > > >> y, > > >> > > x0, > > >> > > > y0, z, pt_year, pt_mon, pt_day]) > > >> > > > +- Exchange(distribution=[hash_random(key=[x, y], > > >> bucket_num=8)]) > > >> > > > +- TableSourceScan(table=[[default_catalog, > > >> default_database, > > >> > > > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y]) > > >> > > > > > >> > > > What do you think? > > >> > > > > > >> > > > [1] > > >> > > > > > >> > > > > >> > > > >> > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join > > >> > > > > > >> > > > Best, > > >> > > > Jing Zhang > > >> > > > > >> > > > > >> > > > >> > > > > > >