This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 7c30a8cca4a [SPARK-38829][SQL] Introduce conf 
spark.sql.parquet.inferTimestampNTZ.enabled for TimestampNTZ inference on 
Parquet
7c30a8cca4a is described below

commit 7c30a8cca4ad2c10c79f4afec8fc4ca2a3cdd10b
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Wed Feb 1 21:53:12 2023 -0800

    [SPARK-38829][SQL] Introduce conf 
spark.sql.parquet.inferTimestampNTZ.enabled for TimestampNTZ inference on 
Parquet
    
    ### What changes were proposed in this pull request?
    
    Introduces conf `spark.sql.parquet.inferTimestampNTZ.enabled` for 
TimestampNTZ inference on Parquet. When enabled, Parquet timestamp columns with 
annotation isAdjustedToUTC = false are inferred as TIMESTAMP_NTZ type during 
schema inference. Otherwise, all the Parquet timestamp columns are inferred as 
TIMESTAMP_LTZ types. Note that Spark writes the output schema into Parquet's 
footer metadata on file writing and leverages it on file reading. Thus this 
configuration only affects the sch [...]
    
    This PR also removes the configuration 
`spark.sql.parquet.timestampNTZ.enabled`. The configuration is "heavy". When 
false, Spark can't write TimestampNTZ columns to Parquet files.
    ### Why are the changes needed?
    
    The only compatibility issue for supporting TimestampNTZ on the Parquet 
data source is about schema inference on the files which are not written by 
Spark. Thus we can simply have a flag about it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, TimestampNTZ is not released yet.
    ### How was this patch tested?
    
    UT
    
    Closes #39856 from gengliangwang/inferParuqetTimestampNTZ.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
    (cherry picked from commit ae24327e8984e7ee8ba948e3be0b6b2f28df68d0)
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 17 +++---
 .../parquet/SpecificParquetRecordReaderBase.java   |  4 +-
 .../datasources/parquet/ParquetFileFormat.scala    | 10 ++--
 .../datasources/parquet/ParquetReadSupport.scala   | 62 ++++++++--------------
 .../parquet/ParquetSchemaConverter.scala           | 24 ++++-----
 .../datasources/parquet/ParquetUtils.scala         |  4 --
 .../datasources/v2/parquet/ParquetScan.scala       |  4 +-
 .../parquet/ParquetFieldIdSchemaSuite.scala        |  6 +--
 .../datasources/parquet/ParquetIOSuite.scala       | 40 ++++++--------
 .../datasources/parquet/ParquetSchemaSuite.scala   | 27 +++++-----
 10 files changed, 81 insertions(+), 117 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f407dda009c..1cc3b61b834 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1129,13 +1129,14 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
-  val PARQUET_TIMESTAMP_NTZ_ENABLED =
-    buildConf("spark.sql.parquet.timestampNTZ.enabled")
-      .doc(s"Enables ${TimestampTypes.TIMESTAMP_NTZ} support for Parquet reads 
and writes. " +
-        s"When enabled, ${TimestampTypes.TIMESTAMP_NTZ} values are written as 
Parquet timestamp " +
-        "columns with annotation isAdjustedToUTC = false and are inferred in a 
similar way. " +
-        s"When disabled, such values are read as 
${TimestampTypes.TIMESTAMP_LTZ} and have to be " +
-        s"converted to ${TimestampTypes.TIMESTAMP_LTZ} for writes.")
+  val PARQUET_INFER_TIMESTAMP_NTZ_ENABLED =
+    buildConf("spark.sql.parquet.inferTimestampNTZ.enabled")
+      .doc("When enabled, Parquet timestamp columns with annotation 
isAdjustedToUTC = false " +
+        "are inferred as TIMESTAMP_NTZ type during schema inference. 
Otherwise, all the Parquet " +
+        "timestamp columns are inferred as TIMESTAMP_LTZ types. Note that 
Spark writes the " +
+        "output schema into Parquet's footer metadata on file writing and 
leverages it on file " +
+        "reading. Thus this configuration only affects the schema inference on 
Parquet files " +
+        "which are not written by Spark.")
       .version("3.4.0")
       .booleanConf
       .createWithDefault(true)
@@ -4943,7 +4944,7 @@ class SQLConf extends Serializable with Logging {
 
   def ignoreMissingParquetFieldId: Boolean = 
getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)
 
-  def parquetTimestampNTZEnabled: Boolean = 
getConf(PARQUET_TIMESTAMP_NTZ_ENABLED)
+  def parquetInferTimestampNTZEnabled: Boolean = 
getConf(PARQUET_INFER_TIMESTAMP_NTZ_ENABLED)
 
   def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 6ea1a0c37b1..b14f329b413 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -149,7 +149,7 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
     config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
     config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
-    config.setBoolean(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED().key(), false);
+    config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), 
false);
 
     this.file = new Path(path);
     long length = 
this.file.getFileSystem(config).getFileStatus(this.file).getLen();
@@ -200,7 +200,7 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
     config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
     config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
-    config.setBoolean(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED().key(), false);
+    config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), 
false);
     this.parquetColumn = new ParquetToSparkSchemaConverter(config)
       .convertParquetColumn(requestedSchema, Option.empty());
     this.sparkSchema = (StructType) parquetColumn.sparkType();
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 6b4651e3260..afa00aa6f37 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -151,8 +151,8 @@ class ParquetFileFormat
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
       sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
     hadoopConf.setBoolean(
-      SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key,
-      sparkSession.sessionState.conf.parquetTimestampNTZEnabled)
+      SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
+      sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
 
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
@@ -357,7 +357,7 @@ object ParquetFileFormat extends Logging {
     val converter = new ParquetToSparkSchemaConverter(
       sparkSession.sessionState.conf.isParquetBinaryAsString,
       sparkSession.sessionState.conf.isParquetINT96AsTimestamp,
-      timestampNTZEnabled = 
sparkSession.sessionState.conf.parquetTimestampNTZEnabled)
+      inferTimestampNTZ = 
sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
 
     val seen = mutable.HashSet[String]()
     val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
@@ -453,14 +453,14 @@ object ParquetFileFormat extends Logging {
       sparkSession: SparkSession): Option[StructType] = {
     val assumeBinaryIsString = 
sparkSession.sessionState.conf.isParquetBinaryAsString
     val assumeInt96IsTimestamp = 
sparkSession.sessionState.conf.isParquetINT96AsTimestamp
-    val timestampNTZEnabled = 
sparkSession.sessionState.conf.parquetTimestampNTZEnabled
+    val inferTimestampNTZ = 
sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled
 
     val reader = (files: Seq[FileStatus], conf: Configuration, 
ignoreCorruptFiles: Boolean) => {
       // Converter used to convert Parquet `MessageType` to Spark SQL 
`StructType`
       val converter = new ParquetToSparkSchemaConverter(
         assumeBinaryIsString = assumeBinaryIsString,
         assumeInt96IsTimestamp = assumeInt96IsTimestamp,
-        timestampNTZEnabled = timestampNTZEnabled)
+        inferTimestampNTZ = inferTimestampNTZ)
 
       readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
         .map(ParquetFileFormat.readSchemaFromFooter(_, converter))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 1d35e9ea049..6e29afce491 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -130,8 +130,8 @@ object ParquetReadSupport extends Logging {
       SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
     val useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key,
       SQLConf.PARQUET_FIELD_ID_READ_ENABLED.defaultValue.get)
-    val timestampNTZEnabled = 
conf.getBoolean(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key,
-      SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.defaultValue.get)
+    val inferTimestampNTZ = 
conf.getBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
+      SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get)
     val ignoreMissingIds = 
conf.getBoolean(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.key,
       SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.defaultValue.get)
 
@@ -152,7 +152,7 @@ object ParquetReadSupport extends Logging {
            |""".stripMargin)
     }
     val parquetClippedSchema = 
ParquetReadSupport.clipParquetSchema(parquetFileSchema,
-      catalystRequestedSchema, caseSensitive, useFieldId, timestampNTZEnabled)
+      catalystRequestedSchema, caseSensitive, useFieldId)
 
     // We pass two schema to ParquetRecordMaterializer:
     // - parquetRequestedSchema: the schema of the file data we want to read
@@ -194,10 +194,9 @@ object ParquetReadSupport extends Logging {
       parquetSchema: MessageType,
       catalystSchema: StructType,
       caseSensitive: Boolean,
-      useFieldId: Boolean,
-      timestampNTZEnabled: Boolean): MessageType = {
+      useFieldId: Boolean): MessageType = {
     val clippedParquetFields = clipParquetGroupFields(
-      parquetSchema.asGroupType(), catalystSchema, caseSensitive, useFieldId, 
timestampNTZEnabled)
+      parquetSchema.asGroupType(), catalystSchema, caseSensitive, useFieldId)
     if (clippedParquetFields.isEmpty) {
       ParquetSchemaConverter.EMPTY_MESSAGE
     } else {
@@ -212,25 +211,21 @@ object ParquetReadSupport extends Logging {
       parquetType: Type,
       catalystType: DataType,
       caseSensitive: Boolean,
-      useFieldId: Boolean,
-      timestampNTZEnabled: Boolean): Type = {
+      useFieldId: Boolean): Type = {
     val newParquetType = catalystType match {
       case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
         // Only clips array types with nested type as element type.
-        clipParquetListType(parquetType.asGroupType(), t.elementType, 
caseSensitive, useFieldId,
-          timestampNTZEnabled)
+        clipParquetListType(parquetType.asGroupType(), t.elementType, 
caseSensitive, useFieldId)
 
       case t: MapType
         if !isPrimitiveCatalystType(t.keyType) ||
            !isPrimitiveCatalystType(t.valueType) =>
         // Only clips map types with nested key type or value type
         clipParquetMapType(
-          parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive, 
useFieldId,
-          timestampNTZEnabled)
+          parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive, 
useFieldId)
 
       case t: StructType =>
-        clipParquetGroup(
-          parquetType.asGroupType(), t, caseSensitive, useFieldId, 
timestampNTZEnabled)
+        clipParquetGroup(parquetType.asGroupType(), t, caseSensitive, 
useFieldId)
 
       case _ =>
         // UDTs and primitive types are not clipped.  For UDTs, a clipped 
version might not be able
@@ -266,8 +261,7 @@ object ParquetReadSupport extends Logging {
       parquetList: GroupType,
       elementType: DataType,
       caseSensitive: Boolean,
-      useFieldId: Boolean,
-      timestampNTZEnabled: Boolean): Type = {
+      useFieldId: Boolean): Type = {
     // Precondition of this method, should only be called for lists with 
nested element types.
     assert(!isPrimitiveCatalystType(elementType))
 
@@ -275,7 +269,7 @@ object ParquetReadSupport extends Logging {
     // list element type is just the group itself.  Clip it.
     if (parquetList.getLogicalTypeAnnotation == null &&
       parquetList.isRepetition(Repetition.REPEATED)) {
-      clipParquetType(parquetList, elementType, caseSensitive, useFieldId, 
timestampNTZEnabled)
+      clipParquetType(parquetList, elementType, caseSensitive, useFieldId)
     } else {
       assert(
         
parquetList.getLogicalTypeAnnotation.isInstanceOf[ListLogicalTypeAnnotation],
@@ -309,15 +303,14 @@ object ParquetReadSupport extends Logging {
           .as(LogicalTypeAnnotation.listType())
           .addField(
             clipParquetType(
-              repeatedGroup, elementType, caseSensitive, useFieldId, 
timestampNTZEnabled))
+              repeatedGroup, elementType, caseSensitive, useFieldId))
           .named(parquetList.getName)
       } else {
         val newRepeatedGroup = Types
           .repeatedGroup()
           .addField(
             clipParquetType(
-              repeatedGroup.getType(0), elementType, caseSensitive, useFieldId,
-              timestampNTZEnabled))
+              repeatedGroup.getType(0), elementType, caseSensitive, 
useFieldId))
           .named(repeatedGroup.getName)
 
         val newElementType = if (useFieldId && repeatedGroup.getId != null) {
@@ -347,8 +340,7 @@ object ParquetReadSupport extends Logging {
       keyType: DataType,
       valueType: DataType,
       caseSensitive: Boolean,
-      useFieldId: Boolean,
-      timestampNTZEnabled: Boolean): GroupType = {
+      useFieldId: Boolean): GroupType = {
     // Precondition of this method, only handles maps with nested key types or 
value types.
     assert(!isPrimitiveCatalystType(keyType) || 
!isPrimitiveCatalystType(valueType))
 
@@ -361,11 +353,9 @@ object ParquetReadSupport extends Logging {
         .repeatedGroup()
         .as(repeatedGroup.getLogicalTypeAnnotation)
         .addField(
-          clipParquetType(
-            parquetKeyType, keyType, caseSensitive, useFieldId, 
timestampNTZEnabled))
+          clipParquetType(parquetKeyType, keyType, caseSensitive, useFieldId))
         .addField(
-          clipParquetType(
-            parquetValueType, valueType, caseSensitive, useFieldId, 
timestampNTZEnabled))
+          clipParquetType(parquetValueType, valueType, caseSensitive, 
useFieldId))
         .named(repeatedGroup.getName)
       if (useFieldId && repeatedGroup.getId != null) {
         newRepeatedGroup.withId(repeatedGroup.getId.intValue())
@@ -393,11 +383,9 @@ object ParquetReadSupport extends Logging {
       parquetRecord: GroupType,
       structType: StructType,
       caseSensitive: Boolean,
-      useFieldId: Boolean,
-      timestampNTZEnabled: Boolean): GroupType = {
+      useFieldId: Boolean): GroupType = {
     val clippedParquetFields =
-      clipParquetGroupFields(parquetRecord, structType, caseSensitive, 
useFieldId,
-        timestampNTZEnabled)
+      clipParquetGroupFields(parquetRecord, structType, caseSensitive, 
useFieldId)
     Types
       .buildGroup(parquetRecord.getRepetition)
       .as(parquetRecord.getLogicalTypeAnnotation)
@@ -414,12 +402,10 @@ object ParquetReadSupport extends Logging {
       parquetRecord: GroupType,
       structType: StructType,
       caseSensitive: Boolean,
-      useFieldId: Boolean,
-      timestampNTZEnabled: Boolean): Seq[Type] = {
+      useFieldId: Boolean): Seq[Type] = {
     val toParquet = new SparkToParquetSchemaConverter(
       writeLegacyParquetFormat = false,
-      useFieldId = useFieldId,
-      timestampNTZEnabled = timestampNTZEnabled)
+      useFieldId = useFieldId)
     lazy val caseSensitiveParquetFieldMap =
         parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
     lazy val caseInsensitiveParquetFieldMap =
@@ -430,7 +416,7 @@ object ParquetReadSupport extends Logging {
     def matchCaseSensitiveField(f: StructField): Type = {
       caseSensitiveParquetFieldMap
           .get(f.name)
-          .map(clipParquetType(_, f.dataType, caseSensitive, useFieldId, 
timestampNTZEnabled))
+          .map(clipParquetType(_, f.dataType, caseSensitive, useFieldId))
           .getOrElse(toParquet.convertField(f))
     }
 
@@ -445,8 +431,7 @@ object ParquetReadSupport extends Logging {
               throw 
QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(
-                parquetTypes.head, f.dataType, caseSensitive, useFieldId, 
timestampNTZEnabled)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, 
useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
     }
@@ -462,8 +447,7 @@ object ParquetReadSupport extends Logging {
             throw 
QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
               fieldId, parquetTypesString)
           } else {
-            clipParquetType(
-              parquetTypes.head, f.dataType, caseSensitive, useFieldId, 
timestampNTZEnabled)
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, 
useFieldId)
           }
         }.getOrElse {
           // When there is no ID match, we use a fake name to avoid a name 
match by accident
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index bb0df3639dc..f6b02579d31 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -48,31 +48,31 @@ import org.apache.spark.sql.types._
  *        [[TimestampType]] fields.
  * @param caseSensitive Whether use case sensitive analysis when comparing 
Spark catalyst read
  *                      schema with Parquet schema.
- * @param timestampNTZEnabled Whether TimestampNTZType type is enabled.
+ * @param inferTimestampNTZ Whether TimestampNTZType type is enabled.
  */
 class ParquetToSparkSchemaConverter(
     assumeBinaryIsString: Boolean = 
SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
     assumeInt96IsTimestamp: Boolean = 
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
     caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get,
-    timestampNTZEnabled: Boolean = 
SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.defaultValue.get) {
+    inferTimestampNTZ: Boolean = 
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get) {
 
   def this(conf: SQLConf) = this(
     assumeBinaryIsString = conf.isParquetBinaryAsString,
     assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
     caseSensitive = conf.caseSensitiveAnalysis,
-    timestampNTZEnabled = conf.parquetTimestampNTZEnabled)
+    inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled)
 
   def this(conf: Configuration) = this(
     assumeBinaryIsString = 
conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
     assumeInt96IsTimestamp = 
conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
     caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean,
-    timestampNTZEnabled = 
conf.get(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key).toBoolean)
+    inferTimestampNTZ = 
conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean)
 
   /**
    * Returns true if TIMESTAMP_NTZ type is enabled in this 
ParquetToSparkSchemaConverter.
    */
   def isTimestampNTZEnabled(): Boolean = {
-    timestampNTZEnabled
+    inferTimestampNTZ
   }
 
   /**
@@ -266,7 +266,7 @@ class ParquetToSparkSchemaConverter(
             }
           case timestamp: TimestampLogicalTypeAnnotation
             if timestamp.getUnit == TimeUnit.MICROS || timestamp.getUnit == 
TimeUnit.MILLIS =>
-            if (timestamp.isAdjustedToUTC || !timestampNTZEnabled) {
+            if (timestamp.isAdjustedToUTC || !inferTimestampNTZ) {
               TimestampType
             } else {
               TimestampNTZType
@@ -460,27 +460,23 @@ class ParquetToSparkSchemaConverter(
  * @param outputTimestampType which parquet timestamp type to use when writing.
  * @param useFieldId whether we should include write field id to Parquet 
schema. Set this to false
  *        via `spark.sql.parquet.fieldId.write.enabled = false` to disable 
writing field ids.
- * @param timestampNTZEnabled whether TIMESTAMP_NTZ type support is enabled.
  */
 class SparkToParquetSchemaConverter(
     writeLegacyParquetFormat: Boolean = 
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
     outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
       SQLConf.ParquetOutputTimestampType.INT96,
-    useFieldId: Boolean = 
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get,
-    timestampNTZEnabled: Boolean = 
SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.defaultValue.get) {
+    useFieldId: Boolean = 
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get) {
 
   def this(conf: SQLConf) = this(
     writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
     outputTimestampType = conf.parquetOutputTimestampType,
-    useFieldId = conf.parquetFieldIdWriteEnabled,
-    timestampNTZEnabled = conf.parquetTimestampNTZEnabled)
+    useFieldId = conf.parquetFieldIdWriteEnabled)
 
   def this(conf: Configuration) = this(
     writeLegacyParquetFormat = 
conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean,
     outputTimestampType = SQLConf.ParquetOutputTimestampType.withName(
       conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)),
-    useFieldId = 
conf.get(SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key).toBoolean,
-    timestampNTZEnabled = 
conf.get(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key).toBoolean)
+    useFieldId = 
conf.get(SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key).toBoolean)
 
   /**
    * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
@@ -568,7 +564,7 @@ class SparkToParquetSchemaConverter(
               .as(LogicalTypeAnnotation.timestampType(true, 
TimeUnit.MILLIS)).named(field.name)
         }
 
-      case TimestampNTZType if timestampNTZEnabled =>
+      case TimestampNTZType =>
         Types.primitive(INT64, repetition)
           .as(LogicalTypeAnnotation.timestampType(false, 
TimeUnit.MICROS)).named(field.name)
       case BinaryType =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
index 33438560e95..de4eda1acfd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
@@ -461,10 +461,6 @@ object ParquetUtils extends Logging {
       SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
       sqlConf.parquetFieldIdWriteEnabled.toString)
 
-    conf.set(
-      SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key,
-      sqlConf.parquetTimestampNTZEnabled.toString)
-
     // Sets compression scheme
     conf.set(ParquetOutputFormat.COMPRESSION, 
parquetOptions.compressionCodecClassName)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index 619a8fe66e3..7495893a911 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -89,8 +89,8 @@ case class ParquetScan(
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
       sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
     hadoopConf.setBoolean(
-      SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key,
-      sparkSession.sessionState.conf.parquetTimestampNTZEnabled)
+      SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
+      sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
 
     val broadcastedConf = sparkSession.sparkContext.broadcast(
       new SerializableConfiguration(hadoopConf))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
index 400e0ce28ea..b3babdd3a0c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
@@ -39,16 +39,14 @@ class ParquetFieldIdSchemaSuite extends ParquetSchemaTest {
       catalystSchema: StructType,
       expectedSchema: String,
       caseSensitive: Boolean = true,
-      useFieldId: Boolean = true,
-      timestampNTZEnabled: Boolean = true): Unit = {
+      useFieldId: Boolean = true): Unit = {
     test(s"Clipping with field id - $testName") {
       val fileSchema = MessageTypeParser.parseMessageType(parquetSchema)
       val actual = ParquetReadSupport.clipParquetSchema(
         fileSchema,
         catalystSchema,
         caseSensitive = caseSensitive,
-        useFieldId = useFieldId,
-        timestampNTZEnabled = timestampNTZEnabled)
+        useFieldId = useFieldId)
 
       // each fake name should be uniquely generated
       val fakeColumnNames = 
actual.getPaths.asScala.flatten.filter(_.startsWith(FAKE_COLUMN_NAME))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index df96403ac50..8670d95c65e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -156,7 +156,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
   }
 
   test("SPARK-36182: TimestampNTZ") {
-    withSQLConf(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key -> "true") {
+    withSQLConf(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key -> "true") {
       val data = Seq("2021-01-01T00:00:00", "1970-07-15T01:02:03.456789")
         .map(ts => Tuple1(LocalDateTime.parse(ts)))
       withAllParquetReaders {
@@ -193,9 +193,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
         }
         writer.close
 
-        for (timestampNTZEnabled <- Seq(true, false)) {
-          withSQLConf(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key -> 
s"$timestampNTZEnabled") {
-            val timestampNTZType = if (timestampNTZEnabled) TimestampNTZType 
else TimestampType
+        for (inferTimestampNTZ <- Seq(true, false)) {
+          withSQLConf(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key -> 
s"$inferTimestampNTZ") {
+            val timestampNTZType = if (inferTimestampNTZ) TimestampNTZType 
else TimestampType
 
             withAllParquetReaders {
               val df = spark.read.parquet(tablePath.toString)
@@ -214,7 +214,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
               val ltz_value = new java.sql.Timestamp(1000L)
               val ntz_value = LocalDateTime.of(1970, 1, 1, 0, 0, 1)
 
-              val exp = if (timestampNTZEnabled) {
+              val exp = if (inferTimestampNTZ) {
                 (0 until numRecords).map { _ =>
                   (ltz_value, ltz_value, ltz_value, ltz_value, ntz_value, 
ntz_value)
                 }.toDF()
@@ -233,26 +233,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
   }
 
   test("Write TimestampNTZ type") {
-    // Writes should fail if timestamp_ntz support is disabled.
-    withSQLConf(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key -> "false") {
-      withTempPath { dir =>
-        val data = Seq(LocalDateTime.parse("2021-01-01T00:00:00")).toDF("col")
-        val err = intercept[Exception] {
+    // The configuration PARQUET_INFER_TIMESTAMP_NTZ_ENABLED doesn't affect 
the behavior of writes.
+    Seq(true, false).foreach { inferTimestampNTZ =>
+      withSQLConf(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key -> 
inferTimestampNTZ.toString) {
+        withTempPath { dir =>
+          val data = 
Seq(LocalDateTime.parse("2021-01-01T00:00:00")).toDF("col")
           data.write.parquet(dir.getCanonicalPath)
-        }.getCause
-        assert(err.getMessage.contains("Unsupported data type timestamp_ntz"))
-      }
-    }
-
-    withSQLConf(SQLConf.PARQUET_TIMESTAMP_NTZ_ENABLED.key -> "true") {
-      withTempPath { dir =>
-        val data = Seq(LocalDateTime.parse("2021-01-01T00:00:00")).toDF("col")
-        data.write.parquet(dir.getCanonicalPath)
-        assertResult(spark.read.parquet(dir.getCanonicalPath).schema) {
-          StructType(
-            StructField("col", TimestampNTZType, nullable = true) ::
-            Nil
-          )
+          assertResult(spark.read.parquet(dir.getCanonicalPath).schema) {
+            StructType(
+              StructField("col", TimestampNTZType, nullable = true) ::
+                Nil
+            )
+          }
         }
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 4c0fbfda681..468e31d1879 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -63,14 +63,14 @@ abstract class ParquetSchemaTest extends ParquetTest with 
SharedSparkSession {
       binaryAsString: Boolean,
       int96AsTimestamp: Boolean,
       caseSensitive: Boolean = false,
-      timestampNTZEnabled: Boolean = true,
+      inferTimestampNTZ: Boolean = true,
       sparkReadSchema: Option[StructType] = None,
       expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
     val converter = new ParquetToSparkSchemaConverter(
       assumeBinaryIsString = binaryAsString,
       assumeInt96IsTimestamp = int96AsTimestamp,
       caseSensitive = caseSensitive,
-      timestampNTZEnabled = timestampNTZEnabled)
+      inferTimestampNTZ = inferTimestampNTZ)
 
     test(s"sql <= parquet: $testName") {
       val actualParquetColumn = converter.convertParquetColumn(
@@ -97,11 +97,10 @@ abstract class ParquetSchemaTest extends ParquetTest with 
SharedSparkSession {
       writeLegacyParquetFormat: Boolean,
       outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
         SQLConf.ParquetOutputTimestampType.INT96,
-      timestampNTZEnabled: Boolean = true): Unit = {
+      inferTimestampNTZ: Boolean = true): Unit = {
     val converter = new SparkToParquetSchemaConverter(
       writeLegacyParquetFormat = writeLegacyParquetFormat,
-      outputTimestampType = outputTimestampType,
-      timestampNTZEnabled = timestampNTZEnabled)
+      outputTimestampType = outputTimestampType)
 
     test(s"sql => parquet: $testName") {
       val actual = converter.convert(sqlSchema)
@@ -2261,7 +2260,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
         """.stripMargin,
       binaryAsString = true,
       int96AsTimestamp = int96AsTimestamp,
-      timestampNTZEnabled = true)
+      inferTimestampNTZ = true)
   }
 
   testCatalystToParquet(
@@ -2286,14 +2285,14 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
       """.stripMargin,
     writeLegacyParquetFormat = false,
-    timestampNTZEnabled = true)
+    inferTimestampNTZ = true)
 
-  for (timestampNTZEnabled <- Seq(true, false)) {
-    val dataType = if (timestampNTZEnabled) TimestampNTZType else TimestampType
+  for (inferTimestampNTZ <- Seq(true, false)) {
+    val dataType = if (inferTimestampNTZ) TimestampNTZType else TimestampType
 
     testParquetToCatalyst(
       "TimestampNTZ Parquet to Spark conversion for complex types, " +
-        s"timestampNTZEnabled: $timestampNTZEnabled",
+        s"inferTimestampNTZ: $inferTimestampNTZ",
       StructType(
         Seq(
           StructField("f1", dataType),
@@ -2315,7 +2314,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
         """.stripMargin,
       binaryAsString = true,
       int96AsTimestamp = false,
-      timestampNTZEnabled = timestampNTZEnabled)
+      inferTimestampNTZ = inferTimestampNTZ)
   }
 
   private def testSchemaClipping(
@@ -2339,8 +2338,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
         MessageTypeParser.parseMessageType(parquetSchema),
         catalystSchema,
         caseSensitive,
-        useFieldId = false,
-        timestampNTZEnabled = true)
+        useFieldId = false)
 
       try {
         expectedSchema.checkContains(actual)
@@ -2907,8 +2905,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
          MessageTypeParser.parseMessageType(parquetSchema),
           catalystSchema,
           caseSensitive = false,
-          useFieldId = false,
-          timestampNTZEnabled = false)
+          useFieldId = false)
       }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to