[ 
https://issues.apache.org/jira/browse/IMPALA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

gaoxiaoqing updated IMPALA-10253:
---------------------------------
    Description: 
we have the following parquet table:
{code:java}
CREATE EXTERNAL TABLE rawdata.event_ros_p1 (
  event_id INT,
  user_id BIGINT,
  time TIMESTAMP,
  p_abook_type STRING 
)
PARTITIONED BY (
  day INT,
  event_bucket INT
)
STORED AS PARQUET
LOCATION 'hdfs://localhost:20500/sa/data/1/event'
{code}
the data show as following:
||event_id||user_id||time||p_abook_type||
|1|-922235446862664806|2018-07-18 09:01:06.158|小说|
|2|-922235446862664806|2018-07-19 09:01:06.158|小说|

if we want remapping event_id to the real event name, we can realize dict udf. 
the dict udf is defined as DICT(BIGINT expression, STRING path). first 
parameter is the column, second parameter is hdfs path which store the 
remapping rule like this:
{code:java}
1,SignUp
2,ViewProduct{code}
then build a view table which add the dict column on original table:
{code:java}
CREATE VIEW rawdata.event_external_view_p7 AS SELECT events.*, dict(`event_id`, 
'/data/1/event.txt') AS `event` FROM rawdata.event_view_p7 events
{code}
If the query group by column has dict, the query is very slow because of each 
line need remapping: 
{code:java}
select event, count(*) from event_external_view_p7 where event in ('SignUp', 
'ViewProduct') group by event;{code}
 explain result is 
{code:java}
PLAN-ROOT SINK
|
04:EXCHANGE [UNPARTITIONED]
|
03:AGGREGATE [FINALIZE]
|  output: count:merge(*)
|  group by: event
|  row-size=20B cardinality=0
|
02:EXCHANGE [HASH(event)]
|
01:AGGREGATE [STREAMING]
|  output: count(*)
|  group by: rawdata.DICT(event_id, '/data/1/event.txt')
|  row-size=20B cardinality=0
|
00:SCAN HDFS [rawdata.event_ros_p7_merge_offline]
|  partitions=39/39 files=99 size=9.00GB
|  predicates: rawdata.DICT(event_id, '/data/1/event.txt') IN ('SignUp', 
'ViewProduct')
|  row-size=4B cardinality=unavailable
{code}
we can modify plan, rewrite AGGREGATE NODE and SCAN NODE, the new plan like 
this:
{code:java}
PLAN-ROOT SINK
|
05:SELECT [FINALIZE]
|  output: dict(event_id)
|  row-size=20B cardinality=0
|
04:EXCHANGE [UNPARTITIONED]
|
03:AGGREGATE [FINALIZE]
|  output: count:merge(*)
|  group by: event_id
|  row-size=20B cardinality=0
|
02:EXCHANGE [HASH(event)]
|
01:AGGREGATE [STREAMING]
|  output: count(*)
|  group by: event_id
|  row-size=20B cardinality=0
|
00:SCAN HDFS [rawdata.event_ros_p7_merge_offline]
|  partitions=39/39 files=99 size=9.00GB
|  predicates: event_id IN (1, 2)
|  row-size=4B cardinality=unavailable
{code}
 

 

  was:
If we have the following parquet table:
{code:java}
CREATE EXTERNAL TABLE rawdata.event_ros_p1 (
  event_id INT,
  user_id BIGINT,
  time TIMESTAMP,
  p_abook_type STRING 
)
PARTITIONED BY (
  day INT,
  event_bucket INT
)
STORED AS PARQUET
LOCATION 'hdfs://localhost:20500/sa/data/1/event'
{code}
 the data as the following:
||event_id||user_id||time||p_abook_type||
|1|-922235446862664806|2018-07-18 09:01:06.158|小说|
|2|-922235446862664806|2018-07-19 09:01:06.158|小说|

now, we need remapping event_id to the real event name to show customer, the 
remapping rule like this:
{code:java}
1,SignUp
2,ViewProduct{code}
 we can realize udf remapping event_id to event_name, the rule store on hdfs, 
and then build a view table:
{code:java}
CREATE VIEW rawdata.event_external_view_p7 AS SELECT events.*, dict(`event_id`, 
'/data/1/event.txt') AS `event` FROM rawdata.event_view_p7 events
{code}
 If the query group by dict udf function, the query is very slow because of 
each line need remapping: 
{code:java}
select event, count(*) from event_external_view_p7 where event in ('SignUp', 
'ViewProduct') group by event;{code}
 explain result is 
{code:java}
PLAN-ROOT SINK
|
04:EXCHANGE [UNPARTITIONED]
|
03:AGGREGATE [FINALIZE]
|  output: count:merge(*)
|  group by: event
|  row-size=20B cardinality=0
|
02:EXCHANGE [HASH(event)]
|
01:AGGREGATE [STREAMING]
|  output: count(*)
|  group by: rawdata.dict(event_id)
|  row-size=20B cardinality=0
|
00:SCAN HDFS [rawdata.event_ros_p7_merge_offline]
|  partitions=39/39 files=99 size=9.00GB
|  predicates: rawdata.dict(event_id) IN ('SignUp', 'ViewProduct')
|  row-size=4B cardinality=unavailable
{code}
we can modify plan, rewrite AGGREGATE NODE and SCAN NODE, the new plan like 
this:
{code:java}
PLAN-ROOT SINK
|
05:SELECT [FINALIZE]
|  output: dict(event_id)
|  row-size=20B cardinality=0
|
04:EXCHANGE [UNPARTITIONED]
|
03:AGGREGATE [FINALIZE]
|  output: count:merge(*)
|  group by: event_id
|  row-size=20B cardinality=0
|
02:EXCHANGE [HASH(event)]
|
01:AGGREGATE [STREAMING]
|  output: count(*)
|  group by: event_id
|  row-size=20B cardinality=0
|
00:SCAN HDFS [rawdata.event_ros_p7_merge_offline]
|  partitions=39/39 files=99 size=9.00GB
|  predicates: event_id IN (1, 2)
|  row-size=4B cardinality=unavailable
{code}
 

 


> Improve query performance contains dict function
> ------------------------------------------------
>
>                 Key: IMPALA-10253
>                 URL: https://issues.apache.org/jira/browse/IMPALA-10253
>             Project: IMPALA
>          Issue Type: New Feature
>          Components: Frontend
>            Reporter: gaoxiaoqing
>            Priority: Major
>
> we have the following parquet table:
> {code:java}
> CREATE EXTERNAL TABLE rawdata.event_ros_p1 (
>   event_id INT,
>   user_id BIGINT,
>   time TIMESTAMP,
>   p_abook_type STRING 
> )
> PARTITIONED BY (
>   day INT,
>   event_bucket INT
> )
> STORED AS PARQUET
> LOCATION 'hdfs://localhost:20500/sa/data/1/event'
> {code}
> the data show as following:
> ||event_id||user_id||time||p_abook_type||
> |1|-922235446862664806|2018-07-18 09:01:06.158|小说|
> |2|-922235446862664806|2018-07-19 09:01:06.158|小说|
> if we want remapping event_id to the real event name, we can realize dict 
> udf. the dict udf is defined as DICT(BIGINT expression, STRING path). first 
> parameter is the column, second parameter is hdfs path which store the 
> remapping rule like this:
> {code:java}
> 1,SignUp
> 2,ViewProduct{code}
> then build a view table which add the dict column on original table:
> {code:java}
> CREATE VIEW rawdata.event_external_view_p7 AS SELECT events.*, 
> dict(`event_id`, '/data/1/event.txt') AS `event` FROM rawdata.event_view_p7 
> events
> {code}
> If the query group by column has dict, the query is very slow because of each 
> line need remapping: 
> {code:java}
> select event, count(*) from event_external_view_p7 where event in ('SignUp', 
> 'ViewProduct') group by event;{code}
>  explain result is 
> {code:java}
> PLAN-ROOT SINK
> |
> 04:EXCHANGE [UNPARTITIONED]
> |
> 03:AGGREGATE [FINALIZE]
> |  output: count:merge(*)
> |  group by: event
> |  row-size=20B cardinality=0
> |
> 02:EXCHANGE [HASH(event)]
> |
> 01:AGGREGATE [STREAMING]
> |  output: count(*)
> |  group by: rawdata.DICT(event_id, '/data/1/event.txt')
> |  row-size=20B cardinality=0
> |
> 00:SCAN HDFS [rawdata.event_ros_p7_merge_offline]
> |  partitions=39/39 files=99 size=9.00GB
> |  predicates: rawdata.DICT(event_id, '/data/1/event.txt') IN ('SignUp', 
> 'ViewProduct')
> |  row-size=4B cardinality=unavailable
> {code}
> we can modify plan, rewrite AGGREGATE NODE and SCAN NODE, the new plan like 
> this:
> {code:java}
> PLAN-ROOT SINK
> |
> 05:SELECT [FINALIZE]
> |  output: dict(event_id)
> |  row-size=20B cardinality=0
> |
> 04:EXCHANGE [UNPARTITIONED]
> |
> 03:AGGREGATE [FINALIZE]
> |  output: count:merge(*)
> |  group by: event_id
> |  row-size=20B cardinality=0
> |
> 02:EXCHANGE [HASH(event)]
> |
> 01:AGGREGATE [STREAMING]
> |  output: count(*)
> |  group by: event_id
> |  row-size=20B cardinality=0
> |
> 00:SCAN HDFS [rawdata.event_ros_p7_merge_offline]
> |  partitions=39/39 files=99 size=9.00GB
> |  predicates: event_id IN (1, 2)
> |  row-size=4B cardinality=unavailable
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscr...@impala.apache.org
For additional commands, e-mail: issues-all-h...@impala.apache.org

Reply via email to