[ 
https://issues.apache.org/jira/browse/HUDI-7246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo closed HUDI-7246.
---------------------------
    Resolution: Fixed

> Data Skipping Issue: No Results When Query Conditions Involve Both Columns 
> with and without Column Stats
> --------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-7246
>                 URL: https://issues.apache.org/jira/browse/HUDI-7246
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Ma Jian
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 0.15.0, 1.0.0
>
>
> In the current code version, support for column stats has not yet been 
> extended to handle complex nested data types, such as map-type data 
> structures. Take the table tbl as an example, which is defined with three 
> fields: an integer field id, a string field name, and a map-type field 
> attributes. Within this table structure, the id and name fields support 
> column stats, and as such, HUDI will generate the corresponding column stats 
> indices for these two fields at the time of table creation. However, no 
> corresponding index will be generated for the attributes field. The specific 
> table creation statement is as follows:
> create table tbl (
>   id int,
>   name string,
>   attributes map<string, string>
> ) ...
> To elaborate further, consider the following insert operation:
> insert into tbl values
> (1, 'a1', map('color', 'red', 'size', 'M')),
> (2, 'a2', map('color', 'blue', 'size', 'L'));After the execution of the 
> insert, the content of the column stats should be as follows:
> a.parquet   id     min: 1   max: 1   null: 0
> b.parquet   id     min: 2   max: 2   null: 0
> a.parquet   name   min: 'a1' max: 'a1' null: 0
> b.parquet   name   min: 'a2' max: 'a2' null: 0{{}}
> This means that there is no column stats index for the attributes column.
> Based on the table tbl, when we execute a query:
> h3. 1.Queries containing only columns supported by column stats:
> At this point, the data skipping code looks like this:
> columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) 
> \{ transposedColStatsDF =>
>   Some(getCandidateFiles(transposedColStatsDF, queryFilters))
> }
> The content of queryReferencedColumns is name, and the content of 
> queryFilters is isnotnull(name#94) and (name#94 = a1). The 
> transposedColStatsDF is then based on the queryReferencedColumns to select 
> the corresponding column stats:
> +--------------------+----------+-------------+-------------+--------------+
> |            fileName|valueCount|name_minValue|name_maxValue|name_nullCount|
> +--------------------+----------+-------------+-------------+--------------+
> |688f0d1e-527b-480...|         1|           a1|           a1|             0|
> |08999951-faa4-48e...|         1|           a2|           a2|             0|
> +--------------------+----------+-------------+-------------+--------------+{{}}
> Inside the getCandidateFiles function, indexSchema and indexFilter are 
> similar to the two parameters above, with the main difference being that in 
> indexFilter, isnotnull(name#94) is converted into ('name_nullCount < 
> 'valueCount). This judges that the name column is not null based on the 
> number of non-nulls being less than the total count. Thus, 
> prunedCandidateFileNames can correctly filter out the required files.
> private def getCandidateFiles(indexDf: DataFrame, queryFilters: 
> Seq[Expression]): Set[String] = \{
>   val indexSchema = indexDf.schema
>   val indexFilter = 
> queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, 
> indexSchema)).reduce(And)
>   val prunedCandidateFileNames =
>     indexDf.where(new Column(indexFilter))
>       .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
>       .collect()
>       .map(_.getString(0))
>       .toSet
>   val allIndexedFileNames =
>     indexDf.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
>       .collect()
>       .map(_.getString(0))
>       .toSet
>   val notIndexedFileNames = 
> lookupFileNamesMissingFromIndex(allIndexedFileNames)
>   prunedCandidateFileNames ++ notIndexedFileNames
> }{{}}
> h3. 2.Queries containing columns not supported by column stats Suppose our 
> query is adjusted to:
> select * from tbl where attributes.color = 'red'
> At this time, queryReferencedColumns is attributes, and queryFilters are 
> isnotnull(attributes#95) and (attributes#95[color] = red). Since 
> transposedColStatsDF does not have column stats for this column, it will be 
> empty. No matter what the query conditions are, prunedCandidateFileNames and 
> allIndexedFileNames in getCandidateFiles will be empty, hitting the logic of 
> notIndexedFileNames and returning all files. Thus, the query is still correct.
> h3. 3.Queries containing both columns supported and not supported by column 
> stats
> Let's consider a query adjusted to:
> select * from tbl where attributes.color = 'red' and name = 'a1'{{}}
> At this point, queryReferencedColumns includes attributes and name, and the 
> queryFilters are as follows. The content of transposedColStatsDF is shown 
> below:
> +--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
> |            
> fileName|valueCount|attributes_minValue|attributes_maxValue|attributes_nullCount|name_minValue|name_maxValue|name_nullCount|
> +--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
> |3363b50e-ca25-4fb...|         1|               null|               null|     
>               1|           a2|           a2|             0|
> |876af2ed-d529-4df...|         1|               null|               null|     
>               1|           a1|           a1|             0|
> +--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
> This is due to the transpose method called within loadTransposed, where the 
> row is returned in the following manner. In case 1, since 
> targetIndexedColumns only contains name, the final transposedRows will only 
> contain name. In case 2, colStatsRecords is empty, so all subsequent stream 
> code won't execute, and transposedRows will be empty.
> val transposedRows: HoodieData[Row] = colStatsRecords
>   .filter(/* filter logic */)
>   .mapToPair(/* map logic */)
>   .groupByKey()
>   .map(/* map logic */)
> In case 3, because colStatsRecords have content, the function inside the last 
> map will be executed. The targetIndexedColumns are assigned as follows 
> (selecting relevant code from different functions in sequence):
> private lazy val indexedColumns: Set[String] = \{
>   val customIndexedColumns = 
> metadataConfig.getColumnsEnabledForColumnStatsIndex
>   if (customIndexedColumns.isEmpty) {
>     tableSchema.fieldNames.toSet
>   } else \{
>     customIndexedColumns.asScala.toSet
>   }
> }{{}}
> This implies that targetIndexedColumns is the intersection of 
> targetColumnNames and indexedColumns, and indexedColumns, by default, 
> includes all the schema fields of the table (tableSchema.fieldNames.toSet) 
> when we haven't explicitly configured a column stats list. This includes the 
> Map type attributes, and targetColumnNames includes columns relevant to the 
> query, so the intersection still results in targetColumnNames and name.
> Returning to the code for transposedRows, because there are no corresponding 
> column stats for attributes, rows where min=null, max=null, and 
> null_count=valueCount are added to avoid errors during subsequent where 
> filtering. This is the origin of attributes_minValue, attributes_maxValue, 
> and attributes_nullCount in the resulting DataFrame.
> Let's revisit getCandidateFiles and take a closer look at the assembly of 
> indexFilter. The queryFilters mentioned earlier are processed in 
> translateIntoColumnStatsIndexFilterExpr, where expressions are transformed, 
> defaulting to True if they cannot be converted. We can skip over name since 
> it's a string type and can be converted directly.
> For map types, there are two expressions: isnotnull(attributes#95) This hits 
> the case:
> case IsNotNull(attribute: AttributeReference) =>
>   getTargetIndexedColumnName(attribute, indexSchema)
>     .map(colName => LessThan(genColNumNullsExpr(colName), 
> genColValueCountExpr))
> The corresponding value is 'attributes_nullCount < 'valueCount.
> However, EqualTo expressions like (attributes#95[color] = red) should, 
> theoretically, match equalTo cases but actually do not since 
> AllowedTransformationExpression does not support the getMapValue type, 
> resulting in a None and no match.
> Ultimately, nothing matches, resulting in None, which means the expression 
> for attributes ends up being 'attributes_nullCount < 'valueCount And True.
> Herein lies a problem:
> acc ++= Seq(null, null, valueCount)
> The null_count for attributes always equals valueCount, so the expression 
> 'attributes_nullCount < 'valueCount will always be False. As a result, 
> prunedCandidateFileNames will always be empty. However, since indexDf does 
> have value, allIndexedFileNames will not be empty, leading 
> notIndexedFileNames to be empty.
> Consequently, the end result is that getCandidateFiles will always return an 
> empty set! This presents a significant issue because the indexFilter 
> effectively fails to filter any files based on the attributes column, due to 
> the fact that its null_count is always set to match valueCount, rendering the 
> condition perpetually false. 
> h3. 4.Explicitly specifying the supported index columns based on case 3.
> Here is an additional scenario to consider, building on case 3: if the index 
> class is explicitly specified, the issue mentioned will not occur.
> This is because the following code will hit 
> metadataConfig.getColumnsEnabledForColumnStatsIndex, resulting in 
> indexedColumns not containing any unspecified columns. Consequently, columns 
> that are not supported will not be added to indexDf, and it will remain empty.
> private lazy val indexedColumns: Set[String] = \{
>   val customIndexedColumns = 
> metadataConfig.getColumnsEnabledForColumnStatsIndex
>   // Column Stats Index could index either
>   //    - The whole table
>   //    - Only configured columns
>   if (customIndexedColumns.isEmpty) {
>     tableSchema.fieldNames.toSet
>   } else \{
>     customIndexedColumns.asScala.toSet
>   }
> }
> Simultaneously, indexSchema is also obtained from the schema of indexDf, so 
> it will not include any unsupported columns. Therefore, query conditions for 
> unsupported columns will also be filtered out in the indexFilter.
> val indexSchema = indexDf.schema
> val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, 
> indexSchema)).reduce(And){{}}
> h3. Problem Summary
> Under the existing code implementation, when data skipping is enabled and the 
> index columns have not been explicitly specified in the parameters, if the 
> query conditions include both columns that are supported by column stats and 
> those that are not, no data will ever be retrieved!
> h3. Problem Fix
> We can see that the variables directly causing the issue are indexDf and 
> indexFilter:
>  * indexDf includes unsupported columns, filled with inappropriate values.
>  * indexFilter also contains query conditions not supported by the index. And 
> indexDf is filled with inappropriate values to adapt to these conditions.
>  * Moreover, the unsupported query conditions in indexFilter are obtained 
> from filtering the schema derived from indexDf.
> Thus, the root of all problems lies in indexDf being filled with unsupported 
> columns.
> Therefore, when fixing the issue, we consider removing the columns that are 
> not supported from the relevant schema before obtaining the column stats 
> records. There are two scenarios to consider:
>  * If index columns are not specified by default, all columns that support 
> column stats are indexed. By removing the unsupported columns, we effectively 
> remove the columns that are not indexed. This will not affect the query 
> results.
>  * If specific index columns are specified, it might be possible that some 
> columns that do support indexing are not indexed. In this case, what we 
> remove is not the complete set of non-indexed columns. But as argued in case 
> 4, this situation will also not affect the query results.
> If subsequent changes or an incomplete consideration of types cause columns 
> that do have indexes to be removed, at worst, it might temporarily affect the 
> data skipping performance. However, the filter will return to the computation 
> layer for further filtering, so it will not impact the query results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to