[ 
https://issues.apache.org/jira/browse/IMPALA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17220603#comment-17220603
 ] 

gaoxiaoqing commented on IMPALA-10253:
--------------------------------------

The plan look like this.

In addition rewrite filter in scan phase, I think It's necessary to rewrite 
query when aggregation. because this can reduce the amount of data that needs 
to be mapped. 

e.g.

before:
{code:java}
select event from event_external_view_p7 
where event in ('SignUp', 'ViewProduct');{code}
after rewrite, we don't need reverse data. We need read hdfs dict file on 
Frontend, and mapping 'SignUp' to constant.
{code:java}
select event from event_external_view_p7 
where event_id in (1, 2);
{code}
e.g.

before:
{code:java}
select event, count(*)
from event_external_view_p7
group by event;
{code}
after rewrite, reverse less data. Rewrite in Frontend
{code:java}
select dict(event_id, '/path/to/dict'), count(*)
from (
  select event_id, count(*)
  from event_external_view_p7
  group by event_id
) a;
{code}
e.g.

before:
{code:java}
select funnel_id, count(*) from (
select case when event = "SignUp" then 1 when event = "ViewProduct" then 2 else 
0 end as funnel_id
from event_test_hue_view_p1) a group by funnel_id
{code}
after rewrite, we don't need reverse data. Rewrite in Frontend
{code:java}
select funnel_id, count(*) from (
select case when event_id = 1 then 1 when event_id = 2 then 2 else 0 end as 
funnel_id
from event_test_hue_view_p1) a group by funnel_id
{code}
Maybe this JIRA can contains many submits? 
 1. rewrite filter expression in scan phrase
 2. rewrite expression in aggregate phrase

 

> Improve query performance contains dict function
> ------------------------------------------------
>
>                 Key: IMPALA-10253
>                 URL: https://issues.apache.org/jira/browse/IMPALA-10253
>             Project: IMPALA
>          Issue Type: New Feature
>          Components: Frontend
>            Reporter: gaoxiaoqing
>            Assignee: gaoxiaoqing
>            Priority: Major
>
> we have the following parquet table:
> {code:java}
> CREATE EXTERNAL TABLE rawdata.event_ros_p1 (
>   event_id INT,
>   user_id BIGINT,
>   time TIMESTAMP,
>   p_abook_type STRING 
> )
> PARTITIONED BY (
>   day INT,
>   event_bucket INT
> )
> STORED AS PARQUET
> LOCATION 'hdfs://localhost:20500/sa/data/1/event'
> {code}
> the data show as following:
> ||event_id||user_id||time||p_abook_type||
> |1|-922235446862664806|2018-07-18 09:01:06.158|小说|
> |2|-922235446862664806|2018-07-19 09:01:06.158|小说|
> if we want remapping event_id to the real event name, we can realize dict 
> udf. the dict udf is defined as DICT(BIGINT expression, STRING path). first 
> parameter is the column, second parameter is hdfs path which store the 
> remapping rule like this:
> {code:java}
> 1,SignUp
> 2,ViewProduct{code}
> then build a view table which add the dict column on original table:
> {code:java}
> CREATE VIEW rawdata.event_external_view_p7 AS SELECT events.*, 
> dict(`event_id`, '/data/1/event.txt') AS `event` FROM rawdata.event_view_p7 
> events
> {code}
> If the query group by column has dict, the query is slower then group by 
> original column. when explain the sql, we found that each line data need 
> remapping in SCAN phase and AGGREGATE phase. 
> {code:java}
> select event, count(*) from event_external_view_p7 where event in ('SignUp', 
> 'ViewProduct') group by event;{code}
> {code:java}
> PLAN-ROOT SINK
> |
> 04:EXCHANGE [UNPARTITIONED]
> |
> 03:AGGREGATE [FINALIZE]
> |  output: count:merge(*)
> |  group by: event
> |  row-size=20B cardinality=0
> |
> 02:EXCHANGE [HASH(event)]
> |
> 01:AGGREGATE [STREAMING]
> |  output: count(*)
> |  group by: rawdata.DICT(event_id, '/data/1/event.txt')
> |  row-size=20B cardinality=0
> |
> 00:SCAN HDFS [rawdata.event_ros_p7_merge_offline]
> |  partitions=39/39 files=99 size=9.00GB
> |  predicates: rawdata.DICT(event_id, '/data/1/event.txt') IN ('SignUp', 
> 'ViewProduct')
> |  row-size=4B cardinality=unavailable
> {code}
> the idea is to modify plan, use original column in SCAN phase and AGGREGATE 
> phase and remapping the original column at last, the new plan like this:
> {code:java}
> PLAN-ROOT SINK
> |
> 05:SELECT [FINALIZE]
> |  output: dict(event_id)
> |  row-size=20B cardinality=0
> |
> 04:EXCHANGE [UNPARTITIONED]
> |
> 03:AGGREGATE [FINALIZE]
> |  output: count:merge(*)
> |  group by: event_id
> |  row-size=20B cardinality=0
> |
> 02:EXCHANGE [HASH(event)]
> |
> 01:AGGREGATE [STREAMING]
> |  output: count(*)
> |  group by: event_id
> |  row-size=20B cardinality=0
> |
> 00:SCAN HDFS [rawdata.event_ros_p7_merge_offline]
> |  partitions=39/39 files=99 size=9.00GB
> |  predicates: event_id IN (1, 2)
> |  row-size=4B cardinality=unavailable
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscr...@impala.apache.org
For additional commands, e-mail: issues-all-h...@impala.apache.org

Reply via email to