[ https://issues.apache.org/jira/browse/HUDI-7246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ethan Guo closed HUDI-7246. --------------------------- Resolution: Fixed > Data Skipping Issue: No Results When Query Conditions Involve Both Columns > with and without Column Stats > -------------------------------------------------------------------------------------------------------- > > Key: HUDI-7246 > URL: https://issues.apache.org/jira/browse/HUDI-7246 > Project: Apache Hudi > Issue Type: Bug > Reporter: Ma Jian > Priority: Critical > Labels: pull-request-available > Fix For: 0.15.0, 1.0.0 > > > In the current code version, support for column stats has not yet been > extended to handle complex nested data types, such as map-type data > structures. Take the table tbl as an example, which is defined with three > fields: an integer field id, a string field name, and a map-type field > attributes. Within this table structure, the id and name fields support > column stats, and as such, HUDI will generate the corresponding column stats > indices for these two fields at the time of table creation. However, no > corresponding index will be generated for the attributes field. The specific > table creation statement is as follows: > create table tbl ( > id int, > name string, > attributes map<string, string> > ) ... > To elaborate further, consider the following insert operation: > insert into tbl values > (1, 'a1', map('color', 'red', 'size', 'M')), > (2, 'a2', map('color', 'blue', 'size', 'L'));After the execution of the > insert, the content of the column stats should be as follows: > a.parquet id min: 1 max: 1 null: 0 > b.parquet id min: 2 max: 2 null: 0 > a.parquet name min: 'a1' max: 'a1' null: 0 > b.parquet name min: 'a2' max: 'a2' null: 0{{}} > This means that there is no column stats index for the attributes column. > Based on the table tbl, when we execute a query: > h3. 1.Queries containing only columns supported by column stats: > At this point, the data skipping code looks like this: > columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) > \{ transposedColStatsDF => > Some(getCandidateFiles(transposedColStatsDF, queryFilters)) > } > The content of queryReferencedColumns is name, and the content of > queryFilters is isnotnull(name#94) and (name#94 = a1). The > transposedColStatsDF is then based on the queryReferencedColumns to select > the corresponding column stats: > +--------------------+----------+-------------+-------------+--------------+ > | fileName|valueCount|name_minValue|name_maxValue|name_nullCount| > +--------------------+----------+-------------+-------------+--------------+ > |688f0d1e-527b-480...| 1| a1| a1| 0| > |08999951-faa4-48e...| 1| a2| a2| 0| > +--------------------+----------+-------------+-------------+--------------+{{}} > Inside the getCandidateFiles function, indexSchema and indexFilter are > similar to the two parameters above, with the main difference being that in > indexFilter, isnotnull(name#94) is converted into ('name_nullCount < > 'valueCount). This judges that the name column is not null based on the > number of non-nulls being less than the total count. Thus, > prunedCandidateFileNames can correctly filter out the required files. > private def getCandidateFiles(indexDf: DataFrame, queryFilters: > Seq[Expression]): Set[String] = \{ > val indexSchema = indexDf.schema > val indexFilter = > queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, > indexSchema)).reduce(And) > val prunedCandidateFileNames = > indexDf.where(new Column(indexFilter)) > .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) > .collect() > .map(_.getString(0)) > .toSet > val allIndexedFileNames = > indexDf.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) > .collect() > .map(_.getString(0)) > .toSet > val notIndexedFileNames = > lookupFileNamesMissingFromIndex(allIndexedFileNames) > prunedCandidateFileNames ++ notIndexedFileNames > }{{}} > h3. 2.Queries containing columns not supported by column stats Suppose our > query is adjusted to: > select * from tbl where attributes.color = 'red' > At this time, queryReferencedColumns is attributes, and queryFilters are > isnotnull(attributes#95) and (attributes#95[color] = red). Since > transposedColStatsDF does not have column stats for this column, it will be > empty. No matter what the query conditions are, prunedCandidateFileNames and > allIndexedFileNames in getCandidateFiles will be empty, hitting the logic of > notIndexedFileNames and returning all files. Thus, the query is still correct. > h3. 3.Queries containing both columns supported and not supported by column > stats > Let's consider a query adjusted to: > select * from tbl where attributes.color = 'red' and name = 'a1'{{}} > At this point, queryReferencedColumns includes attributes and name, and the > queryFilters are as follows. The content of transposedColStatsDF is shown > below: > +--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+ > | > fileName|valueCount|attributes_minValue|attributes_maxValue|attributes_nullCount|name_minValue|name_maxValue|name_nullCount| > +--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+ > |3363b50e-ca25-4fb...| 1| null| null| > 1| a2| a2| 0| > |876af2ed-d529-4df...| 1| null| null| > 1| a1| a1| 0| > +--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+ > This is due to the transpose method called within loadTransposed, where the > row is returned in the following manner. In case 1, since > targetIndexedColumns only contains name, the final transposedRows will only > contain name. In case 2, colStatsRecords is empty, so all subsequent stream > code won't execute, and transposedRows will be empty. > val transposedRows: HoodieData[Row] = colStatsRecords > .filter(/* filter logic */) > .mapToPair(/* map logic */) > .groupByKey() > .map(/* map logic */) > In case 3, because colStatsRecords have content, the function inside the last > map will be executed. The targetIndexedColumns are assigned as follows > (selecting relevant code from different functions in sequence): > private lazy val indexedColumns: Set[String] = \{ > val customIndexedColumns = > metadataConfig.getColumnsEnabledForColumnStatsIndex > if (customIndexedColumns.isEmpty) { > tableSchema.fieldNames.toSet > } else \{ > customIndexedColumns.asScala.toSet > } > }{{}} > This implies that targetIndexedColumns is the intersection of > targetColumnNames and indexedColumns, and indexedColumns, by default, > includes all the schema fields of the table (tableSchema.fieldNames.toSet) > when we haven't explicitly configured a column stats list. This includes the > Map type attributes, and targetColumnNames includes columns relevant to the > query, so the intersection still results in targetColumnNames and name. > Returning to the code for transposedRows, because there are no corresponding > column stats for attributes, rows where min=null, max=null, and > null_count=valueCount are added to avoid errors during subsequent where > filtering. This is the origin of attributes_minValue, attributes_maxValue, > and attributes_nullCount in the resulting DataFrame. > Let's revisit getCandidateFiles and take a closer look at the assembly of > indexFilter. The queryFilters mentioned earlier are processed in > translateIntoColumnStatsIndexFilterExpr, where expressions are transformed, > defaulting to True if they cannot be converted. We can skip over name since > it's a string type and can be converted directly. > For map types, there are two expressions: isnotnull(attributes#95) This hits > the case: > case IsNotNull(attribute: AttributeReference) => > getTargetIndexedColumnName(attribute, indexSchema) > .map(colName => LessThan(genColNumNullsExpr(colName), > genColValueCountExpr)) > The corresponding value is 'attributes_nullCount < 'valueCount. > However, EqualTo expressions like (attributes#95[color] = red) should, > theoretically, match equalTo cases but actually do not since > AllowedTransformationExpression does not support the getMapValue type, > resulting in a None and no match. > Ultimately, nothing matches, resulting in None, which means the expression > for attributes ends up being 'attributes_nullCount < 'valueCount And True. > Herein lies a problem: > acc ++= Seq(null, null, valueCount) > The null_count for attributes always equals valueCount, so the expression > 'attributes_nullCount < 'valueCount will always be False. As a result, > prunedCandidateFileNames will always be empty. However, since indexDf does > have value, allIndexedFileNames will not be empty, leading > notIndexedFileNames to be empty. > Consequently, the end result is that getCandidateFiles will always return an > empty set! This presents a significant issue because the indexFilter > effectively fails to filter any files based on the attributes column, due to > the fact that its null_count is always set to match valueCount, rendering the > condition perpetually false. > h3. 4.Explicitly specifying the supported index columns based on case 3. > Here is an additional scenario to consider, building on case 3: if the index > class is explicitly specified, the issue mentioned will not occur. > This is because the following code will hit > metadataConfig.getColumnsEnabledForColumnStatsIndex, resulting in > indexedColumns not containing any unspecified columns. Consequently, columns > that are not supported will not be added to indexDf, and it will remain empty. > private lazy val indexedColumns: Set[String] = \{ > val customIndexedColumns = > metadataConfig.getColumnsEnabledForColumnStatsIndex > // Column Stats Index could index either > // - The whole table > // - Only configured columns > if (customIndexedColumns.isEmpty) { > tableSchema.fieldNames.toSet > } else \{ > customIndexedColumns.asScala.toSet > } > } > Simultaneously, indexSchema is also obtained from the schema of indexDf, so > it will not include any unsupported columns. Therefore, query conditions for > unsupported columns will also be filtered out in the indexFilter. > val indexSchema = indexDf.schema > val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, > indexSchema)).reduce(And){{}} > h3. Problem Summary > Under the existing code implementation, when data skipping is enabled and the > index columns have not been explicitly specified in the parameters, if the > query conditions include both columns that are supported by column stats and > those that are not, no data will ever be retrieved! > h3. Problem Fix > We can see that the variables directly causing the issue are indexDf and > indexFilter: > * indexDf includes unsupported columns, filled with inappropriate values. > * indexFilter also contains query conditions not supported by the index. And > indexDf is filled with inappropriate values to adapt to these conditions. > * Moreover, the unsupported query conditions in indexFilter are obtained > from filtering the schema derived from indexDf. > Thus, the root of all problems lies in indexDf being filled with unsupported > columns. > Therefore, when fixing the issue, we consider removing the columns that are > not supported from the relevant schema before obtaining the column stats > records. There are two scenarios to consider: > * If index columns are not specified by default, all columns that support > column stats are indexed. By removing the unsupported columns, we effectively > remove the columns that are not indexed. This will not affect the query > results. > * If specific index columns are specified, it might be possible that some > columns that do support indexing are not indexed. In this case, what we > remove is not the complete set of non-indexed columns. But as argued in case > 4, this situation will also not affect the query results. > If subsequent changes or an incomplete consideration of types cause columns > that do have indexes to be removed, at worst, it might temporarily affect the > data skipping performance. However, the filter will return to the computation > layer for further filtering, so it will not impact the query results. -- This message was sent by Atlassian Jira (v8.20.10#820010)