Hi Yang, Another term that is used for the optimization that you mention is "selective join pushdown" which essentially relies on Bloom/Cuckoo and other probabilistic filters. You can check [1] for more details about this kind of techniques.
In the example that you outlined between JDBC and Elastic maybe you could achieve the same result with a slightly different approach by using a correlated join. If the scan + filter on Elastic does not bring back many results then you could use this results to probe the JDBC datasource. For more details check the discussion in [2], I think it refers to the same problem. Best, Stamatis [1] http://www.vldb.org/pvldb/vol12/p502-lang.pdf [2] https://lists.apache.org/thread.html/d9f95683e66009872a53e7e617295158b98746b550d2bf68230b3096%40%3Cdev.calcite.apache.org%3E On Sat, Mar 7, 2020 at 4:16 AM Yang Liu <whilg...@gmail.com> wrote: > Thanks all! > > @Julian is the “split processing into phases” you are referring to like > this? > > with t1 as (select * from es_table where xxx limit xxx); > select * from t2 join t1 on (t2.key = t1.key) where t2.key in (select key > from t1) > > which means the SQL writer need to adapt to this specific form of SQL for > better performance? And Calcite will cache the t1 right? > > Or, maybe I can implement a RelRunner or EnumerableHashJoin myself to have > the specific rule: the query result of right table can be used as filters > for the left table? > > Thanks! > > > Julian Hyde <jh...@apache.org> 于2020年3月7日周六 上午1:48写道: > > > Runtime optimization is always necessary, because you just don’t have the > > stats until you run the query. The best DB algorithms are adaptive, and > > therefore hard to write. The adaptations require a lot of tricky support > > from the runtime - e.g. propagating bloom filters against the flow of > data. > > > > Calcite can still help a little. > > > > One runtime optimization is where you split processing into phases. Only > > optimize the first part of your query. Build temp tables, analyze them, > and > > use those stats to optimize the second part of your query. > > > > Another technique is to gather stats when as you run the query today, so > > that when you run it tomorrow Calcite can do a better job. > > > > Julian > > > > > > > On Mar 6, 2020, at 5:52 AM, Danny Chan <danny0...@apache.org> wrote: > > > > > > Sorry to tell that Calcite runtime does not support this, the "dynamic > > > partition pruning" or "runtime filter" called in Impala, would build a > > > bloom filter for the join keys for the build side table and push it > down > > to > > > the probe table source, thus, in some cases, it can reduce the data. > > > > > > Yang Liu <whilg...@gmail.com> 于2020年3月6日周五 下午6:54写道: > > > > > >> discussed with one of our user groups, in Spark 3.0, this is called > > >> "dynamic > > >> partition pruning" > > >> > > >> Yang Liu <whilg...@gmail.com> 于2020年3月6日周五 下午6:12写道: > > >> > > >>> Hi, > > >>> > > >>> I am wondering if Calcite will support "lazy optimization" (execution > > >> time > > >>> optimization / runtime optimization). > > >>> > > >>> For example, we want to do an inner join between an Elasticsearch > table > > >>> and a MySQL table, like this: > > >>> > > >>> WITH logic_table_2 AS > > >>> (SELECT _MAP['status'] AS "status", > > >>> _MAP['user'] AS "user" > > >>> FROM "es"."insight-by-sql-v3" > > >>> LIMIT 12345) > > >>> SELECT * > > >>> FROM "insight_user"."user_tab" AS t1 > > >>> JOIN logic_table_2 AS t2 ON t1."email" = t2."user" > > >>> WHERE t2."status" = 'fail' > > >>> LIMIT 10 > > >>> > > >>> t2 is a ES table and t1 is a MySQL table, and it may generate a > > execution > > >>> plan like this: > > >>> > > >>> EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5], > > >>> is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0], > > >>> user=[$1]) > > >>> EnumerableLimit(fetch=[10]) > > >>> EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner]) > > >>> ElasticsearchToEnumerableConverter > > >>> ElasticsearchProject(status=[ITEM($0, 'status')], > user=[ITEM($0, > > >>> 'user')]) > > >>> ElasticsearchFilter(condition=[=(ITEM($0, 'status'), > 'fail')]) > > >>> ElasticsearchSort(fetch=[12345]) > > >>> ElasticsearchTableScan(table=[[es, insight-by-sql-v3]]) > > >>> JdbcToEnumerableConverter > > >>> JdbcTableScan(table=[[insight_user, user_tab]]) > > >>> > > >>> since here ES query has a filter, in execution Calcite will do the ES > > >>> query first and get the build table, and then do JdbcTableScan and > get > > >> the > > >>> probe table, and do the HashJoin finally. > > >>> > > >>> But, since this is a INNER JOIN, there is an implicit filter on the > > later > > >>> JdbcTableScan: > > >>> ``` t1.email in (select user from t2 where t2.status='fail') ```, if > > >>> applying this implicit filter, the dataset we will handle may become > > >>> extremely small (save memory) and running much faster since the full > > >>> JdbcTableScan is always time-wasting. But since Calcite do the > > >> optimization > > >>> in planner phase, this dynamic/lazy optimization seems missed ... > > >>> > > >>> To summarize, serial execution with a "lazy optimization" may be > faster > > >>> and use less memory than parallel execution with an optimized > execution > > >>> plan since the former one can reduce dataset we handle. > > >>> > > >>> Any ideas? > > >>> > > >>> > > >>> > > >> > > > > >