[ 
https://issues.apache.org/jira/browse/IMPALA-7751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16665494#comment-16665494
 ] 

Thomas Tauber-Marshall commented on IMPALA-7751:
------------------------------------------------

Yeah, this would be a significant change, I think.

One issue is that Impala currently treats Kudu table partitioning as a black 
box for a few reasons - Impala doesn't have a builtin concept of multi-level 
partitioning schemes (see IMPALA-5255), we don't want to have to guarantee that 
Impala's hash partitioning operates the same as Kudu - so we currently just use 
a Kudu API call to determine the partition for each row. So, we would probably 
need a Kudu API for this that doesn't currently exist (though I suppose we 
could sort of hack it by setting up a scan over the table we're inserting into 
and then seeing which partitions the scan tokens correspond to), or we would 
need to fix the mentioned issues.

Unfortunately, while bulk inserts into Kudu are definitely a pain point for 
Impala, its not really an area that's on the current roadmap. Of course, we 
always welcome contributions and I'm happy to work with anyone who wants to 
take some of this on.

> Kudu insert statement should push down range partition predicates
> -----------------------------------------------------------------
>
>                 Key: IMPALA-7751
>                 URL: https://issues.apache.org/jira/browse/IMPALA-7751
>             Project: IMPALA
>          Issue Type: Improvement
>            Reporter: Quanlong Huang
>            Priority: Major
>         Attachments: metrics1.tsv, metrics2.tsv, metrics3.tsv, profile.txt
>
>
> We have a job dumping newly added data in HDFS into Kudu table for good 
> performance of point queries. Each day we create a new range partition in 
> Kudu for the new data on this day. When we add more and more Kudu range 
> partitions, we found performance degradation of this job.
> The root cause is, the insert statement for kudu does not leverage the 
> partition predicates for kudu range partition keys, which causes skew on the 
> insert nodes.
> How to reveal this:
> Step 1: Launch impala cluster with 3 nodes.
> Step 2: Create an HDFS table with more than 3 underlying files, thus will 
> have more than 3 scan ranges
> {code:sql}
> create table default.metrics_tbl (
>   source_id string,
>   event_timestamp bigint,
>   value double
> ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE;
> {code}
> Upload the three attached tsv files into its directory and refresh this table 
> in Impala.
> Step 3: Create a Kudu table with mix partitions containing 3 hash partitions 
> and 3 range partitions.
> {code:sql}
> create table default.metrics_kudu_tbl (
>   source_id string,
>   event_timestamp bigint,
>   value double,
>   primary key(source_id, event_timestamp)
> ) partition by
>   hash (source_id) PARTITIONS 3,
>   range (event_timestamp) (
>     partition 0 <= values < 10000,
>     partition 10000 <= values < 20000,
>     partition 20000 <= values < 30000
> ) stored as kudu;
> {code}
> Step 4: Dump rows in HDFS table into Kudu giving partition predicates.
> {code:sql}
> insert into table metrics_kudu_tbl
>   select source_id, event_timestamp, value from metrics_tbl
>   where event_timestamp >= 10000 and event_timestamp < 20000;
> {code}
> Step 5: Looking into the profile, there're three fragment instances 
> containing KuduTableSink but only one of them received and generated data.
> {code:java}
>     Averaged Fragment F01:
>       KuduTableSink:
>          - TotalNumRows: 1.00K (1000)
>     Fragment F01:
>       Instance 6347506799a2966d:6e82f49200000004
>         KuduTableSink:
>            - TotalNumRows: 3.00K (3000)
>       Instance 6347506799a2966d:6e82f49200000005
>         KuduTableSink:
>            - TotalNumRows: 0 (0)
>       Instance 6347506799a2966d:6e82f49200000003
>         KuduTableSink:
>            - TotalNumRows: 0 (0)
> {code}
> Thus, only one fragment instance of F01 is sorting and ingesting data into 
> Impala.
> Generally, if there're N range partitions and all the inserted rows are 
> belong to one range (supplied by the partition predicates in WHERE clause), 
> only 1/N of the insert fragments are producing data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscr...@impala.apache.org
For additional commands, e-mail: issues-all-h...@impala.apache.org

Reply via email to