linliu-code opened a new issue, #18752:
URL: https://github.com/apache/hudi/issues/18752

   ## Describe the problem you faced
   
   When writing a Hudi table via the Spark DataSource, the Parquet file's 
timestamp column logical type is always `TIMESTAMP(MICROS, 
isAdjustedToUTC=true)` for Spark `TimestampType` columns, regardless of what 
the user requests via:
   
   1. SparkSession config 
`spark.sql.parquet.outputTimestampType=TIMESTAMP_MILLIS` (or `INT96`), or
   2. The documented Hudi write option 
`hoodie.parquet.outputtimestamptype=TIMESTAMP_MILLIS`.
   
   In contrast, Spark's own Parquet writer (`df.write.parquet(...)`) under the 
**same SparkSession** honors both `TIMESTAMP_MILLIS` and `INT96`.
   
   This means Hudi-written tables cannot:
   - store timestamps as `TIMESTAMP(MILLIS, …)` for downstream consumers that 
expect millisecond-precision Parquet, or
   - store timestamps as `INT96` for legacy Hive/Impala readers that require 
INT96.
   
   Data values round-trip correctly (MICROS is the higher-precision encoding, 
so no value information is lost) — but it silently breaks (a) storage workflows 
targeting MILLIS for smaller files and (b) interop with readers requiring 
INT96. It also means the documented `hoodie.parquet.outputtimestamptype` config 
has no effect.
   
   `TimestampNTZType` is **not** affected: Spark itself also ignores 
`outputTimestampType` for NTZ, so Hudi matching that is correct.
   
   ## To Reproduce
   
   Single-file pyspark script — no Docker required.
   
   ```bash
   export HUDI_BUNDLE=/path/to/hudi-spark3.4-bundle_2.12-<VERSION>.jar
   spark-submit \
     --master 'local[2]' \
     --jars "$HUDI_BUNDLE" \
     --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
     --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar \
     --conf 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
     --conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog 
\
     repro.py
   ```
   
   ```python
   import datetime, glob, os, shutil, tempfile
   from pyspark.sql import SparkSession
   from pyspark.sql.types import StructType, StructField, IntegerType, 
TimestampType
   
   WORK = tempfile.mkdtemp(prefix="hudi_ts_repro_")
   ROW = (1, datetime.datetime(2026, 5, 15, 12, 34, 56, 123456, 
tzinfo=datetime.timezone.utc))
   SCHEMA = StructType([
       StructField("id", IntegerType(), False),
       StructField("ts_tz", TimestampType(), False),
   ])
   HUDI_OPTS = {
       "hoodie.table.name": "ts_repro",
       "hoodie.datasource.write.recordkey.field": "id",
       "hoodie.datasource.write.precombine.field": "id",
       "hoodie.datasource.write.partitionpath.field": "",
       "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
   }
   
   def parquet_logical_type(d, col):
       spark = SparkSession.builder.getOrCreate()
       jvm = spark._jvm
       pq = sorted(glob.glob(f"{d}/**/*.parquet", recursive=True))[0]
       reader = jvm.org.apache.parquet.hadoop.ParquetFileReader.open(
           jvm.org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(
               jvm.org.apache.hadoop.fs.Path(pq), 
spark._jsc.hadoopConfiguration()))
       s = reader.getFileMetaData().getSchema().toString(); reader.close()
       for line in s.splitlines():
           line = line.strip()
           if f" {col};" in line or (f" {col} " in line and "(" in line):
               return line[line.find(col):].rstrip(";").strip()
       return "(not found)"
   
   def run(label, out_ts):
       spark = (SparkSession.builder.appName(f"r_{label}")
           .config("spark.sql.session.timeZone", "UTC")
           .config("spark.sql.parquet.outputTimestampType", 
out_ts).getOrCreate())
       spark.sparkContext.setLogLevel("WARN")
       df = spark.createDataFrame([ROW], SCHEMA)
       sp = os.path.join(WORK, f"{label}_spark"); hp = os.path.join(WORK, 
f"{label}_hudi")
       shutil.rmtree(sp, ignore_errors=True); shutil.rmtree(hp, 
ignore_errors=True)
       df.coalesce(1).write.mode("overwrite").parquet(sp)
       df.write.format("hudi").options(**HUDI_OPTS).mode("overwrite").save(hp)
       print(f"  {label}: Spark-direct = {parquet_logical_type(sp, 'ts_tz')}   
|   Hudi = {parquet_logical_type(hp, 'ts_tz')}")
       spark.stop()
   
   run("session_MILLIS", "TIMESTAMP_MILLIS")
   run("session_MICROS", "TIMESTAMP_MICROS")
   run("session_INT96",  "INT96")
   ```
   
   ## Expected behavior
   
   Same Parquet logical type as Spark-direct:
   
   ```
   session_MILLIS: Spark-direct = ts_tz (TIMESTAMP(MILLIS,true))  |  Hudi = 
ts_tz (TIMESTAMP(MILLIS,true))
   session_MICROS: Spark-direct = ts_tz (TIMESTAMP(MICROS,true))  |  Hudi = 
ts_tz (TIMESTAMP(MICROS,true))
   session_INT96 : Spark-direct = ts_tz                           |  Hudi = 
ts_tz
   ```
   
   ## Actual behavior
   
   ```
   session_MILLIS: Spark-direct = ts_tz (TIMESTAMP(MILLIS,true))  |  Hudi = 
ts_tz (TIMESTAMP(MICROS,true))   << diverged
   session_MICROS: Spark-direct = ts_tz (TIMESTAMP(MICROS,true))  |  Hudi = 
ts_tz (TIMESTAMP(MICROS,true))   << matches
   session_INT96 : Spark-direct = ts_tz                           |  Hudi = 
ts_tz (TIMESTAMP(MICROS,true))   << diverged
   ```
   
   Reproduced identically against three bundles by only swapping `--jars`:
   
   | Bundle (Maven Central / Apache staging) | session MILLIS | Hudi option 
MILLIS | session INT96 |
   |---|---|---|---|
   | `hudi-spark3.4-bundle_2.12-0.15.0.jar` | reproduces | reproduces | 
reproduces |
   | `hudi-spark3.4-bundle_2.12-0.15.1-rc1.jar` | reproduces | reproduces | 
reproduces |
   | `hudi-spark3.4-bundle_2.12-1.1.1.jar` | reproduces | reproduces | 
reproduces |
   
   Bug spans at least 0.15.0 → 1.1.1, surviving the 1.x rewrite.
   
   ## Environment Description
   
   - Hudi version: 0.15.0, 0.15.1-rc1, 1.1.1 (all reproduce; Spark 3.4 bundles, 
Scala 2.12)
   - Spark version: 3.4.3
   - Hadoop version: 3 (bundled Spark distribution)
   - Storage: local FS; bug is in the Parquet writer logical-type selection and 
is storage-independent
   - Running on Docker?: optional
   
   ## Additional context
   
   - `TimestampNTZType` is unaffected (Spark itself ignores 
`outputTimestampType` for NTZ).
   - `hoodie.parquet.outputtimestamptype` is documented but appears to be 
effectively dead code across at least three releases.
   - Workaround for users needing MILLIS: convert to LongType epoch-millis 
before writing. No workaround for INT96 / legacy Hive interop without 
re-writing files outside Hudi.
   
   ## Stacktrace
   
   n/a — no exception; the bug is silent.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to