Hi Yang,

Another term that is used for the optimization that you mention is
"selective join pushdown" which essentially relies on Bloom/Cuckoo and
other probabilistic filters. You can check [1] for more details about this
kind of techniques.

In the example that you outlined between JDBC and Elastic maybe you could
achieve the same result with a slightly different approach by using a
correlated join. If the scan + filter on Elastic does not bring back many
results then you could use this results to probe the JDBC datasource. For
more details check the discussion in [2], I think it refers to the same
problem.

Best,
Stamatis

[1] http://www.vldb.org/pvldb/vol12/p502-lang.pdf
[2]
https://lists.apache.org/thread.html/d9f95683e66009872a53e7e617295158b98746b550d2bf68230b3096%40%3Cdev.calcite.apache.org%3E

On Sat, Mar 7, 2020 at 4:16 AM Yang Liu <whilg...@gmail.com> wrote:

> Thanks all!
>
> @Julian is the “split processing into phases” you are referring to like
> this?
>
> with t1 as (select * from es_table where xxx limit xxx);
> select * from t2 join t1 on (t2.key = t1.key) where t2.key in (select key
> from t1)
>
> which means the SQL writer need to adapt to this specific form of SQL for
> better performance? And Calcite will cache the t1 right?
>
> Or, maybe I can implement a RelRunner or EnumerableHashJoin myself to have
> the specific rule: the query result of right table can be used as filters
> for the left table?
>
> Thanks!
>
>
> Julian Hyde <jh...@apache.org> 于2020年3月7日周六 上午1:48写道:
>
> > Runtime optimization is always necessary, because you just don’t have the
> > stats until you run the query. The best DB algorithms are adaptive, and
> > therefore hard to write. The adaptations require a lot of tricky support
> > from the runtime - e.g. propagating bloom filters against the flow of
> data.
> >
> > Calcite can still help a little.
> >
> > One runtime optimization is where you split processing into phases. Only
> > optimize the first part of your query. Build temp tables, analyze them,
> and
> > use those stats to optimize the second part of your query.
> >
> > Another technique is to gather stats when as you run the query today, so
> > that when you run it tomorrow Calcite can do a better job.
> >
> > Julian
> >
> >
> > > On Mar 6, 2020, at 5:52 AM, Danny Chan <danny0...@apache.org> wrote:
> > >
> > > Sorry to tell that Calcite runtime does not support this, the "dynamic
> > > partition pruning" or "runtime filter" called in Impala, would build a
> > > bloom filter for the join keys for the build side table and push it
> down
> > to
> > > the probe table source, thus, in some cases, it can reduce the data.
> > >
> > > Yang Liu <whilg...@gmail.com> 于2020年3月6日周五 下午6:54写道:
> > >
> > >> discussed with one of our user groups, in Spark 3.0, this is called
> > >> "dynamic
> > >> partition pruning"
> > >>
> > >> Yang Liu <whilg...@gmail.com> 于2020年3月6日周五 下午6:12写道:
> > >>
> > >>> Hi,
> > >>>
> > >>> I am wondering if Calcite will support "lazy optimization" (execution
> > >> time
> > >>> optimization / runtime optimization).
> > >>>
> > >>> For example, we want to do an inner join between an Elasticsearch
> table
> > >>> and a MySQL table, like this:
> > >>>
> > >>> WITH logic_table_2 AS
> > >>>  (SELECT _MAP['status'] AS "status",
> > >>>          _MAP['user'] AS "user"
> > >>>   FROM "es"."insight-by-sql-v3"
> > >>>   LIMIT 12345)
> > >>> SELECT *
> > >>> FROM "insight_user"."user_tab" AS t1
> > >>> JOIN logic_table_2 AS t2 ON t1."email" = t2."user"
> > >>> WHERE t2."status" = 'fail'
> > >>> LIMIT 10
> > >>>
> > >>> t2 is a ES table and t1 is a MySQL table, and it may generate a
> > execution
> > >>> plan like this:
> > >>>
> > >>> EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5],
> > >>> is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0],
> > >>> user=[$1])
> > >>>  EnumerableLimit(fetch=[10])
> > >>>    EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
> > >>>      ElasticsearchToEnumerableConverter
> > >>>        ElasticsearchProject(status=[ITEM($0, 'status')],
> user=[ITEM($0,
> > >>> 'user')])
> > >>>          ElasticsearchFilter(condition=[=(ITEM($0, 'status'),
> 'fail')])
> > >>>            ElasticsearchSort(fetch=[12345])
> > >>>              ElasticsearchTableScan(table=[[es, insight-by-sql-v3]])
> > >>>      JdbcToEnumerableConverter
> > >>>        JdbcTableScan(table=[[insight_user, user_tab]])
> > >>>
> > >>> since here ES query has a filter, in execution Calcite will do the ES
> > >>> query first and get the build table, and then do JdbcTableScan and
> get
> > >> the
> > >>> probe table, and do the HashJoin finally.
> > >>>
> > >>> But, since this is a INNER JOIN, there is an implicit filter on the
> > later
> > >>> JdbcTableScan:
> > >>> ``` t1.email in (select user from t2 where t2.status='fail') ```, if
> > >>> applying this implicit filter, the dataset we will handle may become
> > >>> extremely small (save memory) and running much faster since the full
> > >>> JdbcTableScan is always time-wasting. But since Calcite do the
> > >> optimization
> > >>> in planner phase, this dynamic/lazy optimization seems missed ...
> > >>>
> > >>> To summarize, serial execution with a "lazy optimization" may be
> faster
> > >>> and use less memory than parallel execution with an optimized
> execution
> > >>> plan since the former one can reduce dataset we handle.
> > >>>
> > >>> Any ideas?
> > >>>
> > >>>
> > >>>
> > >>
> >
> >
>

Reply via email to