linliu-code opened a new issue, #18755:
URL: https://github.com/apache/hudi/issues/18755

   ## Describe the problem you faced
   
   When Hudi's column-stats MDT writes a file's stats and the underlying 
Parquet file omits column-chunk statistics (which `parquet-mr` does when at 
least one value exceeds the stats truncation threshold, e.g. a string > ~1 KB), 
Hudi mis-records the file's stats as `nullCount = valueCount` — semantically 
"all N values in this file are NULL."
   
   On query, Hudi's data-skipping reader uses this MDT record and concludes "no 
row in this file can match any non-null predicate," so the file is silently 
skipped. Equality predicates against any column of that file (including short, 
non-null values in the same file) return **zero rows**.
   
   This is silent data loss for any Hudi 1.1.1 table where:
   - col-stats MDT is enabled (the 1.1.1 default), and
   - at least one row in the file has a value larger than `parquet-mr`'s stats 
truncation threshold for its column type (~1 KB for binary/string).
   
   The bug is unrelated to RLI, bloom filters, partition stats, or NaN 
handling. It triggers purely on the col-stats writer path against any Parquet 
file where the writer omitted column-chunk stats.
   
   ## To Reproduce
   
   Single-file pyspark script. Run via spark-submit with the Hudi 1.1.1 Spark 
3.4 bundle:
   
   ```bash
   export HUDI_BUNDLE=/path/to/hudi-spark3.4-bundle_2.12-1.1.1.jar
   spark-submit \
     --master 'local[2]' \
     --jars "$HUDI_BUNDLE" \
     --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
     --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
     --conf 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
     --conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog 
\
     repro.py
   ```
   
   ```python
   import os, shutil, tempfile, glob
   from pyspark.sql import SparkSession
   from pyspark.sql.types import StructType, StructField, StringType, 
IntegerType, LongType
   from pyspark.sql import functions as F
   
   PATH = tempfile.mkdtemp(prefix="hudi_colstats_long_string_")
   spark = 
SparkSession.builder.appName("repro").config("spark.sql.shuffle.partitions","1").getOrCreate()
   spark.sparkContext.setLogLevel("WARN")
   
   schema = StructType([
       StructField("rk", StringType(), False),
       StructField("val", IntegerType(), True),
       StructField("ts", LongType(), False),
   ])
   
   # Default Hudi 1.1.1 options: MDT is enabled, col-stats is on by default
   opts = {
       "hoodie.table.name": "repro",
       "hoodie.datasource.write.recordkey.field": "rk",
       "hoodie.datasource.write.partitionpath.field": "",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
       "hoodie.metadata.enable": "true",
       "hoodie.parquet.small.file.limit": "0",
   }
   
   # Two rows: one short rk, one above the parquet-mr stats truncation threshold
   rows = [("Y", 1, 1), ("Y" * 4096, 2, 1)]
   spark.createDataFrame(rows, 
schema).write.format("hudi").options(**opts).mode("overwrite").save(PATH)
   
   # Hudi reader
   hudi = spark.read.format("hudi").load(PATH)
   print(f"Hudi count: {hudi.count()}")
   print(f"Hudi WHERE rk='Y': {hudi.where(F.col('rk') == F.lit('Y')).count()}  
(expected 1)")
   
   # Same file via raw Spark Parquet reader
   pq = sorted(glob.glob(f"{PATH}/*.parquet"))[0]
   print(f"Raw Parquet WHERE rk='Y': {spark.read.parquet(pq).where(F.col('rk') 
== F.lit('Y')).count()}  (expected 1)")
   
   # Inspect MDT col-stats
   mdt = os.path.join(PATH, ".hoodie", "metadata")
   spark.read.format("hudi").load(mdt).createOrReplaceTempView("vmdt")
   print("\nMDT col-stats for rk:")
   spark.sql("""
     SELECT ColumnStatsMetadata.fileName AS f,
            ColumnStatsMetadata.minValue, ColumnStatsMetadata.maxValue,
            ColumnStatsMetadata.nullCount, ColumnStatsMetadata.valueCount
     FROM vmdt WHERE type=3 AND ColumnStatsMetadata.columnName='rk'
   """).show(truncate=False)
   ```
   
   ## Expected behavior
   
   `WHERE rk = 'Y'` should return 1 row from both the Hudi reader and the raw 
Parquet reader — the short row IS in the file.
   
   The MDT col-stats record for `rk` should reflect that statistics are 
unavailable (e.g., `min=null, max=null` with `nullCount` left unset) so the 
data-skipping reader treats the file as "must scan." It should NOT claim 
`nullCount=2, valueCount=2`.
   
   ## Actual behavior
   
   ```
   Hudi count: 2
   Hudi WHERE rk='Y': 0                              <<< BUG (expected 1)
   Raw Parquet WHERE rk='Y': 1                        (expected 1)
   
   MDT col-stats for rk:
   +--------------------+--------+--------+---------+-----------+
   |f                   |minValue|maxValue|nullCount|valueCount |
   +--------------------+--------+--------+---------+-----------+
   |<file_id>.parquet   |NULL    |NULL    |2        |2          |
   +--------------------+--------+--------+---------+-----------+
   ```
   
   The MDT claims both rows are null. On query, Hudi's data-skipping reads this 
record, concludes "no row in this file is non-null, hence no row matches `rk = 
'Y'`", and skips the file.
   
   The same Parquet file read via `spark.read.parquet(...)` returns 1 row 
correctly — the data IS in the file; the bug is purely in Hudi's col-stats 
writer/reader path.
   
   ## Root cause
   
   
`hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java`,
 method `readColumnStatsFromMetadata`, line 320 (release-1.1.1):
   
   ```java
   return HoodieColumnRangeMetadata.create(
       filePath,
       columnChunkMetaData.getPath().toDotString(),
       convertToNativeJavaType(..., stats.genericGetMin(), valueMetadata),
       convertToNativeJavaType(..., stats.genericGetMax(), valueMetadata),
       // NOTE: In case when column contains only nulls Parquet won't be 
creating
       //       stats for it instead returning stubbed (empty) object. In that 
case
       //       we have to equate number of nulls to the value count ourselves
       stats.isEmpty() ? columnChunkMetaData.getValueCount() : 
stats.getNumNulls(),   // <-- BUG
       columnChunkMetaData.getValueCount(),
       columnChunkMetaData.getTotalSize(),
       columnChunkMetaData.getTotalUncompressedSize(),
       valueMetadata);
   ```
   
   The intent in the comment is correct, but `stats.isEmpty()` returns `true` 
in two semantically-different cases:
   
   | Parquet condition | `stats.isEmpty()` | `stats.isNumNullsSet()` | Correct 
nullCount |
   |---|---|---|---|
   | Column has all-null values | `true` | `true` | `valueCount` |
   | **Column has at least one value too large for stats (parquet-mr omits 
stats entirely)** | **`true`** | **`false`** | **unknown — should not be 
`valueCount`** |
   
   Hudi's current code treats both the same way, fabricating `nullCount = 
valueCount` in the second case. The data-skipping reader then correctly applies 
that record and skips the file, producing zero rows.
   
   `parquet-mr` exposes `Statistics.isNumNullsSet()` exactly to distinguish 
these cases.
   
   ## Suggested patch
   
   ```java
   // in readColumnStatsFromMetadata, replace line 320 with:
   long nullCount;
   if (stats.isEmpty()) {
       if (stats.isNumNullsSet()) {
           // Genuine "all values null" case: parquet recorded num_nulls 
explicitly
           // and chose not to write min/max for an all-null column.
           nullCount = columnChunkMetaData.getValueCount();
       } else {
           // "Statistics absent" case (e.g. a value exceeded the parquet-mr 
stats
           // truncation threshold). We have no information about nulls. Use the
           // unset sentinel so the data-skipping reader treats this column's
           // stats as unavailable and falls back to scanning.
           nullCount = -1;  // or whatever sentinel HoodieColumnRangeMetadata
                             // recognizes as "unknown"; see also the 
data-skipping
                             // reader's interpretation of nullCount.
       }
   } else {
       nullCount = stats.getNumNulls();
   }
   ```
   
   The data-skipping reader on the query side may also need a small change to 
recognize the "unknown nullCount" sentinel and skip stats-based pruning for 
that file × column (rather than treating `nullCount` of `-1` as "all values 
matching the predicate are absent").
   
   ## Cross-reference
   
   This is the same code-surface bug class as #18754 (NaN in numeric columns 
silently records `min=0.0, max=0.0`). Both arise from 
`ParquetUtils.readColumnStatsFromMetadata` mishandling exceptional `parquet-mr` 
`Statistics` states. A single PR could fix both:
   
   - **NaN case (#18754)**: `convertToNativeJavaType(..., 
stats.genericGetMin(), ...)` returns `0.0` when stats are absent due to NaN. 
The fix should produce `null` min/max and unset null-count.
   - **Long-value case (this issue)**: `stats.isEmpty() ? valueCount : 
getNumNulls()` returns `valueCount` when stats are absent due to truncation. 
The fix is shown above.
   
   ## Environment Description
   
   - Hudi version: **1.1.1** (current GA from Maven Central; 
`hudi-spark3.4-bundle_2.12-1.1.1.jar`)
   - Spark version: 3.4.3
   - Hadoop version: 3 (bundled with Spark)
   - Storage: local FS — bug is in the col-stats writer/reader and is 
storage-independent
   - Running on Docker?: optional
   
   ## Stacktrace
   
   n/a — no exception; the bug is silent.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to