leesf commented on code in PR #7235: URL: https://github.com/apache/hudi/pull/7235#discussion_r1026069202
########## rfc/rfc-63/rfc-63.md: ########## @@ -0,0 +1,370 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# RFC-63: Index Function for Optimizing Query Performance + +## Proposers + +- @yihua +- @alexeykudinkin + +## Approvers + +- @vinothchandar +- @xushiyan +- @nsivabalan + +## Status + +JIRA: [HUDI-512](https://issues.apache.org/jira/browse/HUDI-512) + +## Abstract + +In this RFC, we address the problem of accelerating queries containing predicates based on functions defined on a +column, by introducing **Index Function**, a new indexing capability for efficient file pruning. + +## Background + +To make the queries finish faster, one major optimization technique is to scan less data by pruning rows that are not +needed by the query. This is usually done in two ways: + +- **Partition pruning**: The partition pruning relies on a table with physical partitioning, such as Hive partitioning. + A partitioned table uses a chosen column such as the date of `timestamp` and stores the rows with the same date to the + files under the same folder or physical partition, such as `date=2022-10-01/`. When the predicate in a query + references the partition column of the physical partitioning, the files in the partitions not matching the predicate + are filtered out, without scanning. For example, for the predicate `date between '2022-10-01' and '2022-10-02'`, the + partition pruning only returns the files from two partitions, `2022-10-01` and `2022-10-02`, for further processing. + The granularity of the pruning is at the partition level. + + +- **File pruning**: The file pruning carries out the pruning of the data at the file level, with the help of file-level + or record-level index. For example, with column stats index containing minimum and maximum values of a column for each + file, the files falling out of the range of the values compared to the predicate can be pruned. For a predicate + with `age < 20`, the file pruning filters out a file with columns stats of `[30, 40]` as the minimum and maximum + values of the column `age`. + +While Apache Hudi already supports partition pruning and file pruning with data skipping for different query engines, we +recognize that the following use cases need better query performance and usability: + +- File pruning based on functions defined on a column +- Efficient file pruning for files without physical partitioning +- Effective file pruning after partition evolution, without rewriting data + +Next, we explain these use cases in detail. + +### Use Case 1: Pruning files based on functions defined on a column + +Let's consider a non-partitioned table containing the events with a `timestamp` column. The events with naturally +increasing time are ingested into the table with bulk inserts every hour. In this case, assume that each file should +contain rows for a particular hour: + +| File Name | Min of `timestamp` | Max of `timestamp` | Note | +|---------------------|--------------------|--------------------|--------------------| +| base_file_1.parquet | 1664582400 | 1664586000 | 2022-10-01 12-1 AM | +| base_file_2.parquet | 1664586000 | 1664589600 | 2022-10-01 1-2 AM | +| ... | ... | ... | ... | +| base_file_13.parquet | 1664625600 | 1664629200 | 2022-10-01 12-1 PM | +| base_file_14.parquet | 1664629200 | 1664632800 | 2022-10-01 1-2 PM | +| ... | ... | ... | ... | +| base_file_37.parquet | 1664712000 | 1664715600 | 2022-10-02 12-1 PM | +| base_file_38.parquet | 1664715600 | 1664719200 | 2022-10-02 1-2 PM | + +For a query to get the number of events between 12PM and 2PM each day in a month for time-of-day analysis, the +predicates look like `DATE_FORMAT(timestamp, '%Y-%m-%d') between '2022-10-01' and '2022-10-31'` +and `DATE_FORMAT(timestamp, '%H') between '12' and '13'`. If the data is in a good layout as above, we only need to scan +two files (instead of 24 files) for each day of data, e.g., `base_file_13.parquet` and `base_file_14.parquet` containing +the data for 2022-10-01 12-2 PM. + +Currently, such a fine-grained file pruning based on a function on a column cannot be achieved in Hudi, because +transforming the `timestamp` to the hour of day is not order-preserving, thus the file pruning cannot directly leverage +the file-level column stats of the original column of `timestamp`. In this case, Hudi has to scan all the files for a +day and push the predicate down when reading parquet files, increasing the amount of data to be scanned. + +### Use Case 2: Efficient file pruning for files without physical partitioning + +Let's consider the same non-partitioned table as in the Use Case 1, containing the events with a `timestamp` column. The +difference here is that there are late-arriving data in each batch, meaning that some events contain the `timestamp` +from a few days ago. In realistic scenarios, this happens frequently, where the rows are not strictly grouped or +clustered for any column. + +In the current write operations for a Hudi table, there is no particular data co-location scheme except for sorting mode +based on record key for bulk insert. Hudi also has a mechanism of small file handling, adding new insert records to +existing file groups. As the ingestion makes progress, each file may contain records for a wide range between minimum +and maximum values for a particular column or a function applied on a column, making file pruning based on the +file-level column stats less effective. + +### Use Case 3: File pruning support after partition evolution + +Partition evolution refers to the process of changing the partition columns for writing data to the storage. This +requirement comes up when a user would like to reduce the number of physical partitions and improve the file sizing. + +Consider a case where event logs are stream from microservices and ingested into a raw event table. Each event log +contains a `timestamp` and an associated organization ID (`org_id`). Most queries on the table are organization specific +and fetch logs for a particular time range. A user may attempt to physically partition the data by both `org_id` +and `date(timestamp)`. If there are 1K organization IDs and one year of data, such a physical partitioning scheme writes +at least `365 days x 1K IDs = 365K` data files under 365K partitions. In most cases, the data can be highly skewed based +on the organizations, with most organizations having less data and a handful of organizations having the majority of the +data, so that there can be many small data files. In such a case, the user may want to evolve the partitioning by +using `org_id` only without rewriting existing data, resulting in the physical layout of data like below + +| Physical partition path | File Name | Min of datestr | Max of datestr | Note | +|------------------------------|----------------------|----------------|----------------|-------------------------| +| org_id=1/datestr=2022-10-01/ | base_file_1.parquet | `2022-10-01` | `2022-10-01` | Old partitioning scheme | +| org_id=1/datestr=2022-10-02/ | base_file_2.parquet | `2022-10-02` | `2022-10-02` | | +| org_id=2/datestr=2022-10-01/ | base_file_3.parquet | `2022-10-01` | `2022-10-01` | | +| org_id=3/datestr=2022-10-01/ | base_file_4.parquet | `2022-10-01` | `2022-10-01` | | +| ... | ... | ... | ... | ... | +| org_id=1/ | base_file_10.parquet | `2022-10-10` | `2022-10-11` | New partitioning scheme | +| org_id=2/ | base_file_11.parquet | `2022-10-10` | `2022-10-15` | | +| ... | ... | ... | ... | ... | + +As queries need to look for data for a particular time range, instead of relying on partition pruning, we can still use +file pruning with file-level column stats. For the example above, even in the new partitioning scheme, without data +being physically partitioned by the datestr, the data can still be co-located based the date, because of natural +ingestion order or Hudi's clustering operation. In this case, we can effectively prune files based on the range of Review Comment: in new partitioning schema, if data is lately arrived at hudi, and the data goes to `base_file_11.parquet`, the min/max range would be very large, so the file pruning performance is still poor, how can we handle the situation? would only by clustering? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org