Yes, if you use totally the Calcite runtime, modify the queries is a way to 
avoid huge memory consumption.

Best,
Danny Chan
在 2020年2月21日 +0800 PM3:52,Yang Liu <whilg...@gmail.com>,写道:
> Thanks Danny,
>
> multiplying row count and factor seems not help. In following query
>
> PLAN: EnumerableLimit(fetch=[10])
> EnumerableHashJoin(condition=[=($2, $8)], joinType=[inner])
> JdbcToEnumerableConverter
> JdbcTableScan(table=[[insight_user, user_tab]])
> ElasticsearchToEnumerableConverter
> ElasticsearchProject(status=[ITEM($0, 'status')], user=[ITEM($0,
> 'user')])
> ElasticsearchTableScan(table=[[es, es_table_1]])
> {10.0 rows, 828.9306334778566 cpu, 0.0 io}
>
> The main factor affecting performance is the ElasticsearchTableScan,
> although final row count is 10, intermediate result could be quite huge.
>
> If I am not wrong, when doing aggregation, Calcite provides two ways: in
> memory and in Spark. We just use the `in memory` option, so we care about
> the memory usage.
> If there is no internal solution I think we may need to parse the SQL
> first, if there is no limit or limit is too huge in the SQL, we reject the
> execution or rewrite the LIMIT part...
>
> Thanks
>
>
>
> Danny Chan <yuzhao....@gmail.com> 于2020年2月20日周四 下午10:22写道:
>
> > Sorry to tell that Calcite runtime is not designed to query bigdata. It
> > actually executes all the operator in single task.
> >
> > In distributed systems like Spark or Flink, we usually fallback to merge
> > join instead of hash when the data is huge.
> >
> > Danny Chan <yuzhao....@gmail.com>于2020年2月20日 周四下午10:16写道:
> >
> > > You are right, we have only row count into the source. There is no meta
> > > can describe the query performance. Can you multiply the row count with a
> > > explicit factor?
> > >
> > > Yang Liu <whilg...@gmail.com>于2020年2月20日 周四下午5:05写道:
> > >
> > > > Currently I can use following code to getCost,
> > > >
> > > > def getCost(rootSchema: SchemaPlus, sql: String): RelOptCost = {
> > > > val configBuilder = Frameworks.newConfigBuilder()
> > > > configBuilder.defaultSchema(rootSchema)
> > > > val frameworkConfig = configBuilder.build()
> > > > val planner = Frameworks.getPlanner(frameworkConfig)
> > > >
> > > > val sqlNode = planner.parse(sql)
> > > > val validate = planner.validate(sqlNode)
> > > > val rel = planner.rel(validate).project()
> > > > val mq = rel.getCluster().getMetadataQuery
> > > > mq.getNonCumulativeCost(rel)
> > > > }
> > > >
> > > > but seems not very correct, for example,
> > > >
> > > > between two mysql
> > > > PLAN: EnumerableCalc(expr#0..3=[{inputs}], proj#0..2=[{exprs}])
> > > > EnumerableHashJoin(condition=[=($0, $3)], joinType=[inner])
> > > > JdbcToEnumerableConverter
> > > > JdbcTableScan(table=[[perm, user_table]])
> > > > JdbcToEnumerableConverter
> > > > JdbcProject(id=[$0])
> > > > JdbcTableScan(table=[[insight_user, user_tab]])
> > > > {1500.0 rows, 4500.0 cpu, 0.0 io}
> > > >
> > > >
> > > > between es and mysql
> > > > PLAN: EnumerableLimit(fetch=[10])
> > > > EnumerableHashJoin(condition=[=($2, $8)], joinType=[inner])
> > > > JdbcToEnumerableConverter
> > > > JdbcTableScan(table=[[insight_user, user_tab]])
> > > > ElasticsearchToEnumerableConverter
> > > > ElasticsearchProject(status=[ITEM($0, 'status')], user=[ITEM($0,
> > > > 'user')])
> > > > ElasticsearchTableScan(table=[[es, es_table_1]])
> > > > {10.0 rows, 828.9306334778566 cpu, 0.0 io}
> > > >
> > > > in practice the later one is much slower than the first one since the es
> > > > table is very large.
> > > >
> > > > Seems the planner cost is a logical estimate, may I know the correct
> > > > usage of it?
> > > >
> > > > Thanks
> > > >
> > > > On 2020/02/20 03:00:47, Yang Liu <whilg...@gmail.com> wrote:
> > > > > Hi all,
> > > > >
> > > > > Dive into docs and I find the getJoinRowCount
> > > > > <
> > > >
> > https://calcite.apache.org/apidocs/org/apache/calcite/rel/metadata/RelMdUtil.html#getJoinRowCount(org.apache.calcite.rel.metadata.RelMetadataQuery,org.apache.calcite.rel.core.Join,org.apache.calcite.rex.RexNode)
> > > > >
> > > > > seems the very thing I am looking for, I can reject the join when the
> > > > rows
> > > > > are too many. Any ideas?
> > > > >
> > > > > Yang Liu <whilg...@gmail.com> 于2020年2月17日周一 下午5:13写道:
> > > > >
> > > > > > Thanks Muhammad, can help give more detailed description?
> > > > > > Currently I have searched a RefOptCost, is this the one you are
> > > > referring
> > > > > > to?
> > > > > > And I need to clarify, I do not mind the cost happened in
> > datasources
> > > > > > since that will not affect my application, may just take a longer
> > > > time to
> > > > > > get the result if the cost is high.
> > > > > > I care about the cost in our application and afraid of OOM.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > > > Muhammad Gelbana <mgelb...@apache.org> 于2020年2月17日周一 上午4:26写道:
> > > > > >
> > > > > > > If your only concern is about memory utilization, I would try
> > > > estimating
> > > > > > > this using the plan's cost. But I guess you'll have run some tests
> > to
> > > > > > > estimate the ranges you can accept.
> > > > > > >
> > > > > > >
> > > > > > > On Sun, Feb 16, 2020 at 5:50 PM Yang Liu <whilg...@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > > Is it possible to have some limitations on the SQLs to make sure
> > > > our
> > > > > > > > application which depends on Calcite is "safe"? For example, 
> > > > > > > > when
> > > > do
> > > > > > > merge
> > > > > > > > joining between 2 large datasources, our application maybe OOM
> > > > since the
> > > > > > > > joining process is in memory. If we have the "limitation
> > > > mechanism", we
> > > > > > > can
> > > > > > > > refuse to execute the joining to avoid OOM.
> > > > > > > >
> > > > > > > > Or we can only do the check outside Calcite?
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >

Reply via email to