Sorry for typo. so it would effect existed sql behavior. => it would not effect existed sql behavior.
Jing Zhang <beyond1...@gmail.com> 于2021年12月22日周三 12:36写道: > Hi Jinpeng, > Thanks for response. > I guess we say different solution to solve data skew, please correct me if > I am wrong. > > What you say is for cases hot keys could be known in advance and stored in > metadata. So we could query whether there is data skew and skew values > from RelMetadataQuery. The optimization is used in Hive. > > What I say is an alternative approach which we could not list hot keys in > advance. Just like the hot words of search engines, we know that this > phenomenon exists but there is no way to list hot words in advance. > To avoid data skew for normal hash distribution, we introduce a special > hash distribution. It would send records with the same hash key to one of N > downstream instead of the same downstream. > > I add a new distribution type, but only trigger this distribution type > under specified conditions, so it would effect existed sql behavior. > For example, user could enable this optimization by specify the hint, like > the following. > > select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day > > from default_catalog.default_database.probe as p > > join partition_table_3 /*+ PARTITIONED_JOIN('BUCKET_NUM'= '16')*/ for > system_time as of p.proc_time as b > > on p.x=b.x and p.y=b.y > > WDYT? > > Best, > Jing Zhang > > Jinpeng Wu <wjpabc...@gmail.com> 于2021年12月22日周三 11:45写道: > >> Hi, Jing. I still don't get your point of adding new distribution types. >> >> I think what you need is a new metadata indicating whether there are >> skewed >> values. By looking it up through RelMetadataQuery, you may boost (or even >> make it INF) the cost of two-pass agg or shuffled join and make other >> implementations win. >> >> Adding new shuffle types is not as simple as it appears to be. You may >> need >> to adjust the traits passthrough and derivation logica. And compared with >> metadata query, you may need to maintain the skew values during operators' >> implementation rules, which introduces difficulties for further >> maintenance. >> >> Thanks >> >> On Wed, Dec 22, 2021 at 11:05 AM Jing Zhang <beyond1...@gmail.com> wrote: >> >> > Hi Julian, >> > Make sense. >> > Then a new newAdded RelDistribution type requires a strong reason. >> > I have created a JIRA [1] to track this requirement. >> > >> > [1] https://issues.apache.org/jira/browse/CALCITE-4957 >> > >> > Best, >> > Jing Zhang >> > >> > Julian Hyde <jhyde.apa...@gmail.com> 于2021年12月22日周三 08:04写道: >> > >> > > I think you should contribute a change that adds a new value to the >> enum. >> > > I know that enums are not easily extensible, but in cases like this, >> that >> > > can be a feature rather than a bug. >> > > >> > > There are not very many distribution types, and new distribution types >> > are >> > > rarely invented. Requiring people to contribute to the enum is a >> useful >> > > forcing function: the various groups who use Calcite are forced to >> > describe >> > > their use cases, and when people discover that they have the same use >> > > cases, we tend to get reusable code. >> > > >> > > Converting an enum to an interface makes things a lot less concrete. >> It >> > is >> > > more difficult to reason about a piece of code, and there are bugs >> > because >> > > you can’t review a ’switch’ expression and say ‘yes, that covers all >> > cases’. >> > > >> > > Julian >> > > >> > > > On Dec 21, 2021, at 12:33 AM, Jing Zhang <beyond1...@gmail.com> >> wrote: >> > > > >> > > > Hi community, >> > > > I hope to extend `RelDistribution` to support more distribution >> types >> > in >> > > > order to solve data skew in the normal hash distribution. >> > > > >> > > > When we use hash distribution to bring all records with the same >> hash >> > key >> > > > to the same place, the job performance would be poor if there exists >> > hot >> > > > keys. >> > > > There is a solution to solve this problem, we could send a hot key >> to >> > one >> > > > of serval downstream tasks, chosen at random. >> > > > In HashJoin, we could use random hash partition in one side, for the >> > > other >> > > > input to the join, records relating to the hot key need to be >> > replicated >> > > to >> > > > all downstream tasks handling that key. >> > > > In HashAggregate, we could split the aggregate into partial-final if >> > all >> > > > the aggregation functions support splitting. >> > > > The 10th chapter in the book "Designing Data Intensive Applications" >> > also >> > > > refers this solution to solve data skew. >> > > > >> > > > Anyway, we should first extend `RelDistribution` to support more >> > > > distribution types, for example, hash random type. >> > > > However, `RelDistribution.Type` is enum class which is not >> extensible. >> > > > I would not add the new types in enum `RelDistribution.Type` >> directly. >> > > > I prefer to do a refactor on `RelDistribution.Type` to make it >> > extensible >> > > > and add the new types in the subclass in the external execution >> engine >> > > (e.g >> > > > Flink). >> > > > >> > > > For example, there is a lookup join in Flink. is typically used to >> > > enrich a >> > > > table with data that is queried from an external system. >> > > > For the following query >> > > >> select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day >> > > >> from default_catalog.default_database.probe as p >> > > >> join partition_table_3 for system_time as of p.proc_time as b >> > > >> on p.x=b.x and p.y=b.y >> > > > >> > > > When use normal hash distribution. >> > > > The logical plan is as following, >> > > > +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, >> y, >> > > x0, >> > > > y0, z, pt_year, pt_mon, pt_day]) >> > > > +- Exchange(distribution=[hash[x, y]]) >> > > > +- TableSourceScan(table=[[default_catalog, >> default_database, >> > > > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y]) >> > > > >> > > > If enable data_skew solution in hint, the logical plan is as >> following, >> > > > +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, >> y, >> > > x0, >> > > > y0, z, pt_year, pt_mon, pt_day]) >> > > > +- Exchange(distribution=[hash_random(key=[x, y], >> bucket_num=8)]) >> > > > +- TableSourceScan(table=[[default_catalog, >> default_database, >> > > > probe, source: [CollectionTableSource(x, y)]]], fields=[x, y]) >> > > > >> > > > What do you think? >> > > > >> > > > [1] >> > > > >> > > >> > >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join >> > > > >> > > > Best, >> > > > Jing Zhang >> > > >> > > >> > >> >