linliu-code opened a new issue, #18754:
URL: https://github.com/apache/hudi/issues/18754

   ## Describe the problem you faced
   
   When a Hudi 1.1.1 table contains a file whose indexed `DOUBLE` (or `FLOAT`) 
column has any `NaN` value, two things happen:
   
   **(A) — long-standing latent bug, persisted in MDT.** The column-stats 
metadata table records that file's `min` and `max` for that column as `0.0` and 
`0.0`, instead of the real range (or `null`/"no stats" as the Parquet spec 
mandates for NaN-bearing columns). NaN is not comparable in IEEE-754, so a 
correct implementation must either filter NaN out before computing min/max, or 
skip writing min/max entirely. Hudi instead fabricates `0.0` / `0.0` as the 
stored stats — values that are not in the data at all.
   
   **(B) — REGRESSION in 1.1.1.** In 0.15.x, the corrupted stats from (A) were 
latent: the read-path data-skipping logic apparently did not aggressively rely 
on them, so queries silently returned correct results despite the wrong on-disk 
stats. **In 1.1.1, the read path now uses those wrong stats**, causing silent 
wrong query results across the **whole table** — including predicates on 
columns of files that don't contain NaN at all.
   
   In the minimal 4-file reproduction below, a single NaN row in one column of 
one file causes the following queries to all return **0 rows** instead of the 
correct 10/11 rows:
   
   ```
   d_double > 250    ON=  0    OFF= 11    expected= 11    <<< BUG (silent wrong 
result)
   d_double >= 300   ON=  0    OFF= 11    expected= 11    <<< BUG
   s_str LIKE 'a%'   ON=  0    OFF= 10    expected= 10    <<< BUG  (file 0 has 
no NaN, query on unrelated column)
   s_str LIKE 'b%'   ON=  0    OFF= 10    expected= 10    <<< BUG  (file 1 has 
no NaN, query on unrelated column)
   s_str LIKE 'c%'   ON=  0    OFF= 10    expected= 10    <<< BUG  (file 2 has 
no NaN, query on unrelated column)
   ```
   
   Note the third/fourth/fifth lines: the broken queries are on the **string** 
column of files that contain **no NaN values whatsoever**. One NaN row in one 
column of one file taints data-skipping for the whole table.
   
   This is silent data loss at query time. Users will receive incomplete result 
sets with no error, no warning, and no log message.
   
   ## To Reproduce
   
   Single-file pyspark script — no Docker required. The full script 
`reproduce_hudi_colstats_bug.py`:
   
   ```python
   """Reproduce the Hudi col-stats NaN corruption + cascading wrong-result bug.
   
   Run:
      export HUDI_BUNDLE=/path/to/hudi-spark3.4-bundle_2.12-1.1.1.jar
      spark-submit \
        --master 'local[2]' \
        --jars "$HUDI_BUNDLE" \
        --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
        --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar 
\
        --conf 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
        --conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog 
\
        repro.py
   """
   import os, shutil, sys, tempfile
   from pyspark.sql import SparkSession
   from pyspark.sql.types import StructType, StructField, IntegerType, 
DoubleType, StringType
   
   ROOT = tempfile.mkdtemp(prefix="hudi_cs_repro_")
   spark = (SparkSession.builder.appName("hudi_colstats_bug_repro")
       .config("spark.sql.shuffle.partitions", "1")
       .config("spark.sql.session.timeZone", "UTC").getOrCreate())
   spark.sparkContext.setLogLevel("WARN")
   
   schema = StructType([
       StructField("rk", IntegerType(), False),
       StructField("p",  StringType(),  False),
       StructField("s_str",    StringType(),  True),
       StructField("d_double", DoubleType(),  True),
   ])
   opts = {
       "hoodie.table.name": "hudi_cs_repro",
       "hoodie.datasource.write.recordkey.field": "rk",
       "hoodie.datasource.write.partitionpath.field": "p",
       "hoodie.datasource.write.precombine.field": "rk",
       "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
       "hoodie.parquet.small.file.limit": "0",
       "hoodie.parquet.max.file.size": "1048576",
       "hoodie.metadata.enable": "true",
       "hoodie.metadata.index.column.stats.enable": "true",
       "hoodie.metadata.index.column.stats.column.list": "s_str,d_double",
   }
   
   # 4 files. Files 0..2 are clean (no NaN). File 3 has one NaN row.
   files = [
       [(10000+k, "P", f"a_{k:02d}", float(k))     for k in range(10)],
       [(20000+k, "P", f"b_{k:02d}", 100.0+k)      for k in range(10)],
       [(30000+k, "P", f"c_{k:02d}", 200.0+k)      for k in range(10)],
       [(40000+k, "P", f"d_{k:02d}", 300.0+k)      for k in range(10)] +
       [(40100,   "P", "m_nan",      float("nan"))],
   ]
   for i, rows in enumerate(files):
       spark.createDataFrame(rows, 
schema).write.format("hudi").options(**opts).mode(
           "overwrite" if i == 0 else "append").save(ROOT)
   
   # EFFECT 1: dump the column_stats partition of the MDT.
   mdt = os.path.join(ROOT, ".hoodie", "metadata")
   spark.read.format("hudi").load(mdt).createOrReplaceTempView("v_mdt")
   print("\nEffect 1 — stored col-stats per file for d_double:")
   for r in spark.sql("""
     SELECT ColumnStatsMetadata.fileName AS file,
            CAST(ColumnStatsMetadata.minValue.member4.value AS DOUBLE) AS 
min_dbl,
            CAST(ColumnStatsMetadata.maxValue.member4.value AS DOUBLE) AS 
max_dbl,
            ColumnStatsMetadata.nullCount AS nulls,
            ColumnStatsMetadata.valueCount AS vals
     FROM v_mdt WHERE type = 3 AND ColumnStatsMetadata.columnName = 'd_double'
     ORDER BY ColumnStatsMetadata.fileName
   """).collect():
       verdict = "  <<< CORRUPTED (NaN file stored as min=0, max=0)" if (r.vals 
== 11 and r.min_dbl == 0.0 and r.max_dbl == 0.0) else ""
       print(f"  file={r.file.split('-')[0]:<10}  min={r.min_dbl:8.2f}  
max={r.max_dbl:8.2f}  nulls={r.nulls}/{r.vals}{verdict}")
   
   # EFFECT 2: queries with data-skipping=ON vs OFF.
   print("\nEffect 2 — same query, data-skipping ON vs OFF:")
   for label, sql, expected in [
       ("d_double > 250  (file 3 -> 11 rows)", "d_double > 250", 11),
       ("d_double >= 300 (file 3 -> 11 rows)", "d_double >= 300", 11),
       ("s_str LIKE 'a%' (file 0 -> 10 rows; no NaN in file 0)",  "s_str LIKE 
'a%'", 10),
       ("s_str LIKE 'b%' (file 1 -> 10 rows; no NaN in file 1)",  "s_str LIKE 
'b%'", 10),
       ("s_str LIKE 'c%' (file 2 -> 10 rows; no NaN in file 2)",  "s_str LIKE 
'c%'", 10),
   ]:
       spark.conf.set("hoodie.enable.data.skipping", "true")
       spark.read.format("hudi").load(ROOT).createOrReplaceTempView("v_t")
       on = spark.sql(f"SELECT count(*) AS c FROM v_t WHERE 
{sql}").collect()[0].c
       spark.conf.set("hoodie.enable.data.skipping", "false")
       spark.read.format("hudi").load(ROOT).createOrReplaceTempView("v_t")
       off = spark.sql(f"SELECT count(*) AS c FROM v_t WHERE 
{sql}").collect()[0].c
       flag = "  <<< BUG (silent wrong result)" if on != off else "  ok"
       print(f"  {label}\n      data-skipping ON={on:2d}  OFF={off:2d}  
expected={expected}{flag}")
   ```
   
   ## Expected behavior
   
   **Effect 1.** The NaN-bearing file's stats should follow the Parquet spec 
convention: either store `(min, max)` excluding NaN, or omit min/max entirely 
(signaling "no usable stats, do not prune").
   
   **Effect 2.** With `hoodie.enable.data.skipping=true`, queries on **any** 
column should never return fewer rows than `hoodie.enable.data.skipping=false`. 
The two modes should be equivalent — skipping is supposed to be a transparent 
performance optimization, never a correctness change.
   
   ## Actual behavior
   
   Against `hudi-spark3.4-bundle_2.12-1.1.1.jar` (current GA from Maven 
Central):
   
   ```
   Effect 1 — stored col-stats per file for d_double:
     file=dd886af5  min=   -0.00  max=    9.00  nulls=0/10
     file=8c40b728  min=  100.00  max=  109.00  nulls=0/10
     file=125d33b3  min=  200.00  max=  209.00  nulls=0/10
     file=30306ce2  min=    0.00  max=    0.00  nulls=0/11  <<< CORRUPTED (NaN 
file stored as min=0, max=0)
   
   Effect 2 — same query, data-skipping ON vs OFF:
     d_double > 250                                                ON= 0   
OFF=11   expected=11   <<< BUG
     d_double >= 300                                               ON= 0   
OFF=11   expected=11   <<< BUG
     s_str LIKE 'a%'  (file 0 -> 10 rows; no NaN in file 0)        ON= 0   
OFF=10   expected=10   <<< BUG
     s_str LIKE 'b%'  (file 1 -> 10 rows; no NaN in file 1)        ON= 0   
OFF=10   expected=10   <<< BUG
     s_str LIKE 'c%'  (file 2 -> 10 rows; no NaN in file 2)        ON= 0   
OFF=10   expected=10   <<< BUG
   ```
   
   The third/fourth/fifth lines are the most striking: the silent wrong-result 
happens on a **string** column, against files that contain **no NaN values at 
all** — but the table as a whole contains one NaN row elsewhere, and that's 
enough to taint data-skipping decisions for everything.
   
   ## Cross-version matrix
   
   Same script, same Spark 3.4.3, only swapping the `--jars` bundle:
   
   | Bundle | Effect 1 (NaN → 0.0/0.0 stored in MDT) | Effect 2 (silent wrong 
results, minimal repro) |
   |---|---|---|
   | `hudi-spark3.4-bundle_2.12-0.15.0.jar` | **reproduces** | does NOT 
reproduce (0/5) |
   | `hudi-spark3.4-bundle_2.12-0.15.1-rc1.jar` | **reproduces** | does NOT 
reproduce (0/5) |
   | `hudi-spark3.4-bundle_2.12-1.1.1.jar` | **reproduces** | **reproduces 
5/5** |
   
   So Effect 1 is a long-standing writer-side bug (stats stored wrong since at 
least 0.15.0). Effect 2 is **new in 1.1.1** — the corrupted stats that 0.15.x 
stored but ignored on read are now actively used by 1.1.1's data-skipping path, 
silently dropping result rows.
   
   In 0.15.x this was a latent landmine; in 1.1.1 the landmine reliably 
detonates on the simplest possible NaN-bearing table.
   
   ## Environment Description
   
   - Hudi version: 1.1.1 (current GA from Maven Central; cross-checked against 
0.15.0 GA and 0.15.1-rc1 staging)
   - Spark version: 3.4.3 (`hudi-spark3.4-bundle_2.12`, Scala 2.12)
   - Hadoop version: 3 (bundled Spark distribution)
   - Storage: local FS (Linux), but the bug is in the MDT writer and read-path 
pruner and is storage-independent
   - Running on Docker?: optional
   
   ## Additional context
   
   - The root cause looks like a writer-side oversight: `Math.min(NaN, x)` and 
`Math.max(NaN, x)` both return `NaN` in Java/IEEE-754, so a running min/max 
gets "stuck at NaN" the moment a NaN appears. The fabricated `0.0` likely comes 
from a downstream defaulting step (Avro union-with-default-zero or 
`Optional<Double>` unwrap with default).
   - `NULL` is correctly handled: a column with `{NULL, -Inf, +Inf, 300..309}` 
produces `min=-Inf, max=+Inf, nullCount=1` — i.e. NULL is filtered out of 
min/max as expected. So the bug is specific to `NaN`, not to "any special 
value".
   - ±Inf alone (no NaN) is also correctly handled: a file with `{-Inf, +Inf, 
300..309}` stores `min=-Inf, max=+Inf` correctly. So the NaN trigger is the 
smoking gun, not Inf.
   - NaN appears in real-world data more often than people expect: division by 
zero in upstream transforms (`0.0/0.0`), `sqrt(-x)`, ML feature outputs, JSON 
`NaN` deserialized via permissive parsers, sentinel-for-missing values in 
scientific datasets, etc.
   
   ## Suggested fix direction
   
   Writer side: when computing per-file column stats, exclude NaN from the 
min/max accumulator (mirroring `parquet-mr`'s `DoubleStatistics` behavior since 
1.11). If every value in the column is NaN, omit min/max entirely so the 
read-path pruner falls back to "no skip".
   
   Read side: even with a writer-side fix, existing tables already have 
corrupted stats persisted in their MDT. Either (a) detect "suspicious" stats 
(e.g., min==max==0.0 with a non-zero valueCount) and disable skipping for that 
file, or (b) provide a documented rebuild procedure for the col-stats partition.
   
   ## Workarounds available today
   
   1. Replace `NaN` with `NULL` on ingest for any indexed numeric column. 
(`NULL` is handled correctly.)
   2. Remove the column from `hoodie.metadata.index.column.stats.column.list`.
   3. Disable data-skipping at query time: `hoodie.enable.data.skipping=false`. 
Defeats the purpose of the col-stats feature but guarantees correctness.
   
   ## Stacktrace
   
   n/a — silent wrong result, no exception, no warning, no log line.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to