The problem is probably not the rule (individual rules tend to run very quickly) but the cost of a planner phase that has a lot of rules enabled and thus collectively they generate a lot of possibilities. Planning complexity is a dragon that is difficult to slay.
One trick is to use the HepPlanner to apply a small set of rules, without reference to cost. Are you intending to contribute the rule to Calcite? Julian > On Jan 9, 2017, at 12:46 AM, weijie tong <[email protected]> wrote: > > hi all: > I have written an Drill rule to push count(distinct ) query down to Druid > , but it took more than 2~7 seconds to go through the Volcano Physical > Planning.I want to know is there any tips to notice to write a better > performance rule ? > I have implemented two rules: > 1st: > > agg->agg->scan > > agg->agg->project->scan > > > 2st: > > agg->project->scan > > agg->scan > > > > The query sql is : > > select count(distinct position_title) from testDruid.employee where > birth_date='1961-09-24' group by employee_id > > the Initial phase output is: > > LogicalProject(EXPR$0=[$1]): rowcount = 1.5, cumulative cost = {133.1875 > rows, 232.5 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 9 > LogicalAggregate(group=[{0}], EXPR$0=[COUNT(DISTINCT $1)]): rowcount = > 1.5, cumulative cost = {131.6875 rows, 231.0 cpu, 0.0 io, 0.0 network, 0.0 > memory}, id = 8 > LogicalProject(employee_id=[$4], position_title=[$14]): rowcount = > 15.0, cumulative cost = {130.0 rows, 231.0 cpu, 0.0 io, 0.0 network, 0.0 > memory}, id = 7 > LogicalFilter(condition=[=($0, '1961-09-24')]): rowcount = 15.0, > cumulative cost = {115.0 rows, 201.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, > id = 6 > EnumerableTableScan(table=[[testDruid, employee]]): rowcount = > 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 > memory}, id = 5 > > the physical volcano output is: > > 15:59:58.201 [278cbd08-f2c6-306d-e78e-b479d088cd5d:foreman] DEBUG > o.a.d.e.p.s.h.DefaultSqlHandler - VOLCANO:Physical Planning (7315ms): > ScreenPrel: rowcount = 2500.0, cumulative cost = {77750.0 rows, 970250.0 > cpu, 0.0 io, 2.1504E8 network, 440000.00000000006 memory}, id = 115180 > UnionExchangePrel: rowcount = 2500.0, cumulative cost = {77500.0 rows, > 970000.0 cpu, 0.0 io, 2.1504E8 network, 440000.00000000006 memory}, id = > 115179 > ProjectPrel(EXPR$0=[$1]): rowcount = 2500.0, cumulative cost = {75000.0 > rows, 950000.0 cpu, 0.0 io, 2.048E8 network, 440000.00000000006 memory}, id > = 115178 > HashAggPrel(group=[{0}], EXPR$0=[$SUM0($1)]): rowcount = 2500.0, > cumulative cost = {75000.0 rows, 950000.0 cpu, 0.0 io, 2.048E8 network, > 440000.00000000006 memory}, id = 115177 > HashToRandomExchangePrel(dist0=[[$0]]): rowcount = 25000.0, > cumulative cost = {50000.0 rows, 450000.0 cpu, 0.0 io, 2.048E8 network, 0.0 > memory}, id = 115176 > ProjectPrel(employee_id=[$0], EXPR$0=[$1]): rowcount = 25000.0, > cumulative cost = {25000.0 rows, 50000.0 cpu, 0.0 io, 0.0 network, 0.0 > memory}, id = 115175 > > ScanPrel(groupscan=[DruidGroupScan{groupScanSpec=DruidGroupScanSpec{dataSource='employee', > druidQueryConvertState=MODIFIED, filter=null, allFiltersConverted=true, > hasMultiValueFilters=false, groupbyColumns=[employee_id, position_title], > query=StreamingGroupByQuery{groupByQuery=GroupByQuery{dataSource='employee', > querySegmentSpec=LegacySegmentSpec{intervals=[1000-01-01T00:00:00.000+08:05:43/3000-01-01T00:00:00.000+08:00]}, > limitSpec=NoopLimitSpec, dimFilter=null, granularity=AllGranularity, > dimensions=[DefaultDimensionSpec{dimension='employee_id', > outputName='employee_id'}], > aggregatorSpecs=[DistinctCountAggregatorFactory{name='EXPR$0', > fieldName='position_title'}], postAggregatorSpecs=[], havingSpec=null}, > batchSize=50000}}, columns=[`employee_id`, `EXPR$0`], > segmentIntervals=[1961-09-24T00:00:00.000+08:00/1961-09-25T00:00:00.000+08:00]}]): > rowcount = 25000.0, cumulative cost = {25000.0 rows, 50000.0 cpu, 0.0 io, > 0.0 network, 0.0 memory}, id = 115081
