[ 
https://issues.apache.org/jira/browse/IMPALA-7751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Quanlong Huang updated IMPALA-7751:
-----------------------------------
    Attachment: metrics3.tsv
                metrics2.tsv
                metrics1.tsv

> Kudu insert statement should push down range partition predicates
> -----------------------------------------------------------------
>
>                 Key: IMPALA-7751
>                 URL: https://issues.apache.org/jira/browse/IMPALA-7751
>             Project: IMPALA
>          Issue Type: Improvement
>            Reporter: Quanlong Huang
>            Priority: Major
>         Attachments: metrics1.tsv, metrics2.tsv, metrics3.tsv, profile.txt
>
>
> We have a job dumping newly added data in HDFS into Kudu table for good 
> performance of point queries. Each day we create a new range partition in 
> Kudu for the new data on this day. When we add more and more Kudu range 
> partitions, we found performance degradation of this job.
> The root cause is, the insert statement for kudu does not leverage the 
> partition predicates for kudu range partition keys, which causes skew on the 
> insert nodes.
> How to reveal this:
> Step 1: Launch impala cluster with 3 nodes.
> Step 2: Create an HDFS table with more than 3 underlying files, thus will 
> have more than 3 scan ranges
> {code:sql}
> create table default.metrics_tbl (
>   source_id string,
>   event_timestamp bigint,
>   value double
> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;
> {code}
> Upload the three attached tsv files into its directory and refresh this table 
> in Impala.
> Step 3: Create a Kudu table with mix partitions containing 3 hash partitions 
> and 3 range partitions.
> {code:sql}
> create table default.metrics_kudu_tbl (
>   source_id string,
>   event_timestamp bigint,
>   value double,
>   primary key(source_id, event_timestamp)
> ) partition by
>   hash (source_id) PARTITIONS 3,
>   range (event_timestamp) (
>     partition 0 <= values < 10000,
>     partition 10000 <= values < 20000,
>     partition 20000 <= values < 30000
> ) stored as kudu;
> {code}
> Step 4: Dump rows in HDFS table into Kudu giving partition predicates.
> {code:sql}
> insert into table metrics_kudu_tbl
>   select source_id, event_timestamp, value from metrics_tbl
>   where event_timestamp >= 10000 and event_timestamp < 20000;
> {code}
> Step 5: Looking into the profile, there're three fragment instances 
> containing KuduTableSink but only one of them received and generated data.
> {code:java}
>     Averaged Fragment F01:
>       KuduTableSink:
>          - TotalNumRows: 1.00K (1000)
>     Fragment F01:
>       Instance 6347506799a2966d:6e82f49200000004
>         KuduTableSink:
>            - TotalNumRows: 3.00K (3000)
>       Instance 6347506799a2966d:6e82f49200000005
>         KuduTableSink:
>            - TotalNumRows: 0 (0)
>       Instance 6347506799a2966d:6e82f49200000003
>         KuduTableSink:
>            - TotalNumRows: 0 (0)
> {code}
> Thus, only one fragment instance of F01 is sorting and ingesting data into 
> Impala.
> Generally, if there're N range partitions and all the inserted rows are 
> belong to one range (supplied by the partition predicates in WHERE clause), 
> only 1/N of the insert fragments are producing data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscr...@impala.apache.org
For additional commands, e-mail: issues-all-h...@impala.apache.org

Reply via email to