It's difficult to do "split processing into phases" entirely within
Calcite. Generally a DBMS would manage these phases, and call Calcite
at each step.

I'd love to have some working code in Calcite that does this, but we'd
be stepping over the line from "framework" into "platform". It's
difficult to get contributions for these things.

A more realistic ask is this: If anyone has implemented multi-phase
optimization with Calcite, please write a blog post or a conference
talk and share what you learned. Include a pointer to your code if
your project is open source.

Julian

On Fri, Mar 6, 2020 at 7:16 PM Yang Liu <whilg...@gmail.com> wrote:
>
> Thanks all!
>
> @Julian is the “split processing into phases” you are referring to like
> this?
>
> with t1 as (select * from es_table where xxx limit xxx);
> select * from t2 join t1 on (t2.key = t1.key) where t2.key in (select key
> from t1)
>
> which means the SQL writer need to adapt to this specific form of SQL for
> better performance? And Calcite will cache the t1 right?
>
> Or, maybe I can implement a RelRunner or EnumerableHashJoin myself to have
> the specific rule: the query result of right table can be used as filters
> for the left table?
>
> Thanks!
>
>
> Julian Hyde <jh...@apache.org> 于2020年3月7日周六 上午1:48写道:
>
> > Runtime optimization is always necessary, because you just don’t have the
> > stats until you run the query. The best DB algorithms are adaptive, and
> > therefore hard to write. The adaptations require a lot of tricky support
> > from the runtime - e.g. propagating bloom filters against the flow of data.
> >
> > Calcite can still help a little.
> >
> > One runtime optimization is where you split processing into phases. Only
> > optimize the first part of your query. Build temp tables, analyze them, and
> > use those stats to optimize the second part of your query.
> >
> > Another technique is to gather stats when as you run the query today, so
> > that when you run it tomorrow Calcite can do a better job.
> >
> > Julian
> >
> >
> > > On Mar 6, 2020, at 5:52 AM, Danny Chan <danny0...@apache.org> wrote:
> > >
> > > Sorry to tell that Calcite runtime does not support this, the "dynamic
> > > partition pruning" or "runtime filter" called in Impala, would build a
> > > bloom filter for the join keys for the build side table and push it down
> > to
> > > the probe table source, thus, in some cases, it can reduce the data.
> > >
> > > Yang Liu <whilg...@gmail.com> 于2020年3月6日周五 下午6:54写道:
> > >
> > >> discussed with one of our user groups, in Spark 3.0, this is called
> > >> "dynamic
> > >> partition pruning"
> > >>
> > >> Yang Liu <whilg...@gmail.com> 于2020年3月6日周五 下午6:12写道:
> > >>
> > >>> Hi,
> > >>>
> > >>> I am wondering if Calcite will support "lazy optimization" (execution
> > >> time
> > >>> optimization / runtime optimization).
> > >>>
> > >>> For example, we want to do an inner join between an Elasticsearch table
> > >>> and a MySQL table, like this:
> > >>>
> > >>> WITH logic_table_2 AS
> > >>>  (SELECT _MAP['status'] AS "status",
> > >>>          _MAP['user'] AS "user"
> > >>>   FROM "es"."insight-by-sql-v3"
> > >>>   LIMIT 12345)
> > >>> SELECT *
> > >>> FROM "insight_user"."user_tab" AS t1
> > >>> JOIN logic_table_2 AS t2 ON t1."email" = t2."user"
> > >>> WHERE t2."status" = 'fail'
> > >>> LIMIT 10
> > >>>
> > >>> t2 is a ES table and t1 is a MySQL table, and it may generate a
> > execution
> > >>> plan like this:
> > >>>
> > >>> EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5],
> > >>> is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0],
> > >>> user=[$1])
> > >>>  EnumerableLimit(fetch=[10])
> > >>>    EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
> > >>>      ElasticsearchToEnumerableConverter
> > >>>        ElasticsearchProject(status=[ITEM($0, 'status')], user=[ITEM($0,
> > >>> 'user')])
> > >>>          ElasticsearchFilter(condition=[=(ITEM($0, 'status'), 'fail')])
> > >>>            ElasticsearchSort(fetch=[12345])
> > >>>              ElasticsearchTableScan(table=[[es, insight-by-sql-v3]])
> > >>>      JdbcToEnumerableConverter
> > >>>        JdbcTableScan(table=[[insight_user, user_tab]])
> > >>>
> > >>> since here ES query has a filter, in execution Calcite will do the ES
> > >>> query first and get the build table, and then do JdbcTableScan and get
> > >> the
> > >>> probe table, and do the HashJoin finally.
> > >>>
> > >>> But, since this is a INNER JOIN, there is an implicit filter on the
> > later
> > >>> JdbcTableScan:
> > >>> ``` t1.email in (select user from t2 where t2.status='fail') ```, if
> > >>> applying this implicit filter, the dataset we will handle may become
> > >>> extremely small (save memory) and running much faster since the full
> > >>> JdbcTableScan is always time-wasting. But since Calcite do the
> > >> optimization
> > >>> in planner phase, this dynamic/lazy optimization seems missed ...
> > >>>
> > >>> To summarize, serial execution with a "lazy optimization" may be faster
> > >>> and use less memory than parallel execution with an optimized execution
> > >>> plan since the former one can reduce dataset we handle.
> > >>>
> > >>> Any ideas?
> > >>>
> > >>>
> > >>>
> > >>
> >
> >

Reply via email to