linliu-code opened a new issue, #18769:
URL: https://github.com/apache/hudi/issues/18769

   
   ### Summary
   
   For a `SELECT count(*)` query against a COW table, Hudi's
   `HoodieFileGroupReaderBasedFileFormat` routes through the vectorized
   Parquet reader and reads each base file's content in full, even though
   the query only needs row counts. Spark's native `ParquetFileFormat`
   short-circuits this case by summing row counts from each file's footer
   metadata, reading only a few KB per file. As a result, `count(*)` on
   Hudi tables runs 2-3× slower than the same query on the equivalent raw
   Parquet directory, and reads two orders of magnitude more bytes per
   iteration.
   
   ### Reproduction
   
   Environment:
   - Hudi 1.1.1 (`org.apache.hudi:hudi-spark3.4-bundle_2.12:1.1.1`)
   - Spark 3.4.3
   - Java 11 Corretto
   - COW table, MDT enabled, col-stats index enabled, data skipping enabled
   
   Test script (Python / PySpark):
   
   ```python
   import time, os, shutil
   from pyspark.sql import SparkSession
   from pyspark.sql.types import *
   
   spark = (SparkSession.builder.appName("hudi-count-repro")
       .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .config("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
       .config("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
       .getOrCreate())
   jvm = spark._jvm
   
   # Build the same data both ways
   N, ROWS_PER = 100, 10000
   rows = [(i*ROWS_PER+k, f"P_{i:05d}", i*ROWS_PER+k, 1)
           for i in range(N) for k in range(ROWS_PER)]
   schema = StructType([
       StructField("rk",  IntegerType(), False),
       StructField("p",   StringType(),  False),
       StructField("val", IntegerType(), True),
       StructField("ts",  LongType(),    False)])
   
   # Hudi COW write
   shutil.rmtree("/tmp/hudi_table", ignore_errors=True)
   spark.createDataFrame(rows, 
schema).repartition("p").write.format("hudi").options(**{
       "hoodie.table.name": "t",
       "hoodie.datasource.write.recordkey.field": "rk",
       "hoodie.datasource.write.partitionpath.field": "p",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
       "hoodie.metadata.enable": "true",
       "hoodie.metadata.index.column.stats.enable": "true",
       "hoodie.metadata.index.column.stats.column.list": "rk",
       "hoodie.parquet.max.file.size": "67108864",
   }).mode("overwrite").save("/tmp/hudi_table")
   
   # Raw parquet write — same data
   shutil.rmtree("/tmp/raw_parquet", ignore_errors=True)
   spark.createDataFrame(rows, 
schema).repartition("p").write.partitionBy("p").mode("overwrite").parquet("/tmp/raw_parquet")
   
   def fs_bytes_read():
       s = jvm.org.apache.hadoop.fs.FileSystem.getAllStatistics()
       it = s.iterator()
       total = 0
       while it.hasNext():
           total += it.next().getBytesRead()
       return total
   
   def measure(path, fmt):
       spark.conf.set("hoodie.enable.data.skipping", "true")
       df = spark.read.format(fmt).load(path) if fmt == "hudi" else 
spark.read.parquet(path)
       df.createOrReplaceTempView("vq")
       spark.sql("SELECT count(*) FROM vq").collect()  # warmup
       walls, deltas = [], []
       for _ in range(3):
           before = fs_bytes_read()
           t0 = time.perf_counter()
           spark.sql("SELECT count(*) FROM vq").collect()
           walls.append((time.perf_counter() - t0) * 1000.0)
           deltas.append(fs_bytes_read() - before)
       return walls, deltas
   
   print("hudi:", measure("/tmp/hudi_table", "hudi"))
   print("raw: ", measure("/tmp/raw_parquet", "parquet"))
   ```
   
   ### Measurements
   
   Three scales, identical row content on both sides, 3 iterations each, median 
reported:
   
   | Scale | partitions × rows/part | Hudi bytesRead | Raw bytesRead | 
bytesRead ratio | Hudi wall | Raw wall | Wall ratio |
   |---|---|---|---|---|---|---|---|
   | Tiny  | 1000 × 10    | 882 MB | 3.1 MB | **285×** | 857 ms | 310 ms | 
2.76× |
   | Mid   | 1000 × 1,000  | 882 MB | 2.2 MB | **395×** | 791 ms | 314 ms | 
2.52× |
   | Large | 100 × 10,000 | 88 MB  | 376 KB | **235×** | 122 ms | 56 ms  | 
2.18× |
   
   Read amplification (bytesRead / on-disk size for each side):
   
   | Scale | Hudi amp | Raw amp |
   |---|---|---|
   | Tiny  | **2.03×** | 3.04× |
   | Mid   | **1.99×** | 0.25× |
   | Large | **1.65×** | 0.05× |
   
   **Across all three scales, Hudi reads ~2× the on-disk size every 
iteration.** Raw Parquet's amplification drops to 0.05× at scale L because it 
reads only the file footers (few KB each), not the data. Hudi is reading file 
content.
   
   ### Root cause
   
   `HoodieFileGroupReaderBasedFileFormat.scala` (line 197 of the 1.1.1 source) 
recognizes the `count(*)` case via:
   
   ```scala
   val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
   ```
   
   But the path it routes to (`readBaseFile` at line 282 / 290) doesn't act on 
`isCount`:
   
   ```scala
   case _ =>
     readBaseFile(file, baseFileReader.value, requestedSchema, ...)
   ```
   
   `readBaseFile` (line 385) calls `parquetFileReader.read(...)` which goes 
through `Spark34ParquetReader.doRead` → 
`VectorizedParquetRecordReader.initialize()` → full data read.
   
   Additionally, line 213 injects mandatory meta-fields into `requestedSchema`:
   
   ```scala
   val requestedSchema = StructType(
     requiredSchema.fields ++ partitionSchema.fields.filter(f => 
mandatoryFields.contains(f.name)))
   ```
   
   So even if a downstream check looks at `requestedSchema.isEmpty`, it's 
non-empty by the time it reaches the reader.
   
   Compare with Spark 3.4's `ParquetFileFormat.buildReaderWithPartitionValues` 
— it relies on the upstream optimizer's row-count fast path. Specifically, 
Spark's `Optimizer` rewrites `count(*)` over `LogicalRelation(parquet)` such 
that the row counts come from per-file `BlockMetaData.getRowCount()` summed 
across row groups, never opening the vectorized reader.
   
   ### Suggested fix
   
   When `isCount=true` at line 197, take a fast path: open each file's Parquet 
footer, sum `ParquetMetadata.getBlocks().getRowCount()`, return the total via a 
synthetic single-row iterator. The footer is what 
`ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)` already 
reads anyway, so the actual data bytes never need to be touched.
   
   Sketch:
   
   ```scala
   case _ if isCount =>
     val footer = ParquetFooterReader.readFooter(
       storageConf.unwrap(), file.toPath, NO_FILTER)
     val rowCount = footer.getBlocks.asScala.map(_.getRowCount).sum
     // Return a synthetic InternalRow representing the per-file count
     // (Spark's WSCG count-star aggregator does the rest)
     Iterator.single(InternalRow.fromSeq(Seq(rowCount)))
   ```
   
   (Exact integration with Spark's count aggregator codegen will need care — 
Spark's path uses `inputRDDs`-level filters, not a per-file count, so the 
cleanest implementation may delegate to the equivalent code in Spark's 
`OptimizeMetadataOnlyQuery` or its successor.)
   
   ### Expected impact of the fix
   
   Based on the measurements above, fixing this should:
   - Drop Hudi's bytesRead by ~285× at the smallest scale, ~235× at large scale.
   - Drop wall-clock by ~2-3× across all tested scales.
   - For tables with millions of base files in production, this is a meaningful 
absolute time saving for any query that boils down to `count(*)` (BI 
dashboards, observability metrics).
   
   ### Spark version coverage
   
   This is **not Spark-3.4-specific**. The `isCount` gate sits in shared common
   code at `hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/
   
spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:197`,
   not in any version-specific reader. The version-specific readers
   (`Spark33ParquetReader`, `Spark34ParquetReader`, `Spark35ParquetReader`,
   `Spark40ParquetReader`) all receive the routed `readBaseFile` call without
   the `isCount` flag, and none of them have an internal count fast-path.
   
   Spark 3.5's reader (`Spark35ParquetReader.scala` lines 104-111) actually
   reads *more* per footer than 3.4 in the default vectorized path:
   
   ```scala
   val originalFooter = if (enableVectorizedReader) {
     ParquetFooterReader.readFooter(sharedConf, file, 
ParquetFooterReader.WITH_ROW_GROUPS)
   } else {
     ParquetFooterReader.readFooter(sharedConf, file, 
ParquetFooterReader.SKIP_ROW_GROUPS)
   }
   ```
   
   vs Spark 3.4 which always uses `SKIP_ROW_GROUPS`. So if anything the per-file
   overhead is marginally higher on Spark 3.5, but the dominant cost — reading
   file content via the vectorized reader — is the same.
   
   The measurements above were captured against 
`hudi-spark3.4-bundle_2.12:1.1.1`;
   the same numbers should reproduce against the 3.3 / 3.5 / 4.0 bundles.
   
   ### Caveats and scope
   
   - This issue is **scoped to COW + count(*)**. MOR tables go through 
`HoodieFileGroupReader` and have log files to merge — a footer-count fast path 
doesn't apply.
   - Incremental queries are also out of scope (`!isIncremental` in the gate).
   - The fix does not affect any case where `requiredSchema` is non-empty.
   - **`SELECT * ... LIMIT 1` is out of scope** even though it shows similar 
bytesRead amplification. That query only opens one file (Spark's 
`LimitPushDown` does its job), so the cause is Hudi's per-file size bloat + ~2× 
read amplification on a single file — not a missing query-shape optimization. 
The fix proposed here is keyed on `isCount=true` and wouldn't apply. Worth a 
follow-up issue but distinct mechanism.
   - The per-file size bloat of Hudi base files (5 `_hoodie_*` meta-fields, 
bloom filter, embedded col-stats) is a separate matter from this issue and not 
in scope. It is working as designed and amortizes at production row counts.
   
   ### Related
   
   Three correctness bugs were filed during a recent audit, all touching 
adjacent surfaces:
   
   - apache/hudi#18752 — Spark write path silently ignores 
`outputTimestampType` (write-side correctness)
   - apache/hudi#18754 — NaN corrupts col-stats and silently drops query rows 
(col-stats correctness)
   - apache/hudi#18755 — Col-stats mis-records `nullCount=valueCount` when 
Parquet stats absent (col-stats correctness)
   
   This issue is performance-shaped, not correctness-shaped, and is separate 
from those three.
   
   ### Environment
   
   - Hudi: 1.1.1
   - Spark: 3.4.3
   - Java: 11.0.x Corretto
   - Platform: macOS arm64 (also reproduces in Linux x86 Docker container)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to