This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 5582f92 [SPARK-37463][SQL] Read/Write Timestamp ntz from/to Orc uses int64 5582f92 is described below commit 5582f92046a3486dc6d30e6e4083446fdbd52667 Author: Jiaan Geng <belie...@163.com> AuthorDate: Thu Mar 24 22:43:32 2022 +0800 [SPARK-37463][SQL] Read/Write Timestamp ntz from/to Orc uses int64 ### What changes were proposed in this pull request? #33588 (comment) show Spark cannot read/write timestamp ntz and ltz correctly. Based on the discussion https://github.com/apache/spark/pull/34741#issuecomment-983660633, we just to fix read/write timestamp ntz to Orc uses int64. ### Why are the changes needed? Fix the bug about read/write timestamp ntz from/to Orc with different times zone. ### Does this PR introduce _any_ user-facing change? Yes. Orc timestamp ntz is a new feature. ### How was this patch tested? New tests. Closes #34984 from beliefer/SPARK-37463-int64. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit e410d98f57750080ad46932cc9211d2cf5154c24) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../datasources/orc/OrcAtomicColumnVector.java | 10 ----- .../datasources/orc/OrcDeserializer.scala | 5 +-- .../execution/datasources/orc/OrcFileFormat.scala | 9 ++--- .../sql/execution/datasources/orc/OrcFilters.scala | 11 ++---- .../execution/datasources/orc/OrcSerializer.scala | 4 +- .../sql/execution/datasources/orc/OrcUtils.scala | 46 ++++++---------------- .../datasources/parquet/ParquetRowConverter.scala | 4 ++ .../v2/orc/OrcPartitionReaderFactory.scala | 16 +++----- .../execution/datasources/v2/orc/OrcWrite.scala | 2 +- .../execution/datasources/orc/OrcQuerySuite.scala | 26 ++++++++++++ 10 files changed, 59 insertions(+), 74 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java index b4f7b99..c2d8334 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java @@ -27,7 +27,6 @@ import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DateType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.TimestampType; -import org.apache.spark.sql.types.TimestampNTZType; import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.sql.vectorized.ColumnarMap; import org.apache.spark.unsafe.types.UTF8String; @@ -37,7 +36,6 @@ import org.apache.spark.unsafe.types.UTF8String; */ public class OrcAtomicColumnVector extends OrcColumnVector { private final boolean isTimestamp; - private final boolean isTimestampNTZ; private final boolean isDate; // Column vector for each type. Only 1 is populated for any type. @@ -56,12 +54,6 @@ public class OrcAtomicColumnVector extends OrcColumnVector { isTimestamp = false; } - if (type instanceof TimestampNTZType) { - isTimestampNTZ = true; - } else { - isTimestampNTZ = false; - } - if (type instanceof DateType) { isDate = true; } else { @@ -113,8 +105,6 @@ public class OrcAtomicColumnVector extends OrcColumnVector { int index = getRowIndex(rowId); if (isTimestamp) { return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index)); - } else if (isTimestampNTZ) { - return OrcUtils.fromOrcNTZ(timestampData.asScratchTimestamp(index)); } else { return longData.vector[index]; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala index 0c2856c..564e42e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala @@ -105,7 +105,7 @@ class OrcDeserializer( case IntegerType | _: YearMonthIntervalType => (ordinal, value) => updater.setInt(ordinal, value.asInstanceOf[IntWritable].get) - case LongType | _: DayTimeIntervalType => (ordinal, value) => + case LongType | _: DayTimeIntervalType | _: TimestampNTZType => (ordinal, value) => updater.setLong(ordinal, value.asInstanceOf[LongWritable].get) case FloatType => (ordinal, value) => @@ -129,9 +129,6 @@ class OrcDeserializer( case TimestampType => (ordinal, value) => updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp])) - case TimestampNTZType => (ordinal, value) => - updater.setLong(ordinal, OrcUtils.fromOrcNTZ(value.asInstanceOf[OrcTimestamp])) - case DecimalType.Fixed(precision, scale) => (ordinal, value) => val v = OrcShimUtils.getDecimal(value) v.changePrecision(precision, scale) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 39a8763..2b060c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -142,11 +142,10 @@ class OrcFileFormat val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val resultedColPruneInfo = - Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => - OrcUtils.requestedColumnIds( - isCaseSensitive, dataSchema, requiredSchema, reader, conf) - } + val orcSchema = + Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions))(_.getSchema) + val resultedColPruneInfo = OrcUtils.requestedColumnIds( + isCaseSensitive, dataSchema, requiredSchema, orcSchema, conf) if (resultedColPruneInfo.isEmpty) { Iterator.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 0d85a45..4bb1c18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.orc -import java.sql.Timestamp import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period} import org.apache.hadoop.hive.common.`type`.HiveDecimal @@ -143,11 +142,11 @@ private[sql] object OrcFilters extends OrcFiltersBase { def getPredicateLeafType(dataType: DataType): PredicateLeaf.Type = dataType match { case BooleanType => PredicateLeaf.Type.BOOLEAN case ByteType | ShortType | IntegerType | LongType | - _: AnsiIntervalType => PredicateLeaf.Type.LONG + _: AnsiIntervalType | TimestampNTZType => PredicateLeaf.Type.LONG case FloatType | DoubleType => PredicateLeaf.Type.FLOAT case StringType => PredicateLeaf.Type.STRING case DateType => PredicateLeaf.Type.DATE - case TimestampType | TimestampNTZType => PredicateLeaf.Type.TIMESTAMP + case TimestampType => PredicateLeaf.Type.TIMESTAMP case _: DecimalType => PredicateLeaf.Type.DECIMAL case _ => throw QueryExecutionErrors.unsupportedOperationForDataTypeError(dataType) } @@ -170,11 +169,7 @@ private[sql] object OrcFilters extends OrcFiltersBase { case _: TimestampType if value.isInstanceOf[Instant] => toJavaTimestamp(instantToMicros(value.asInstanceOf[Instant])) case _: TimestampNTZType if value.isInstanceOf[LocalDateTime] => - val orcTimestamp = OrcUtils.toOrcNTZ(localDateTimeToMicros(value.asInstanceOf[LocalDateTime])) - // Hive meets OrcTimestamp will throw ClassNotFoundException, So convert it. - val timestamp = new Timestamp(orcTimestamp.getTime) - timestamp.setNanos(orcTimestamp.getNanos) - timestamp + localDateTimeToMicros(value.asInstanceOf[LocalDateTime]) case _: YearMonthIntervalType => IntervalUtils.periodToMonths(value.asInstanceOf[Period]).longValue() case _: DayTimeIntervalType => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala index a928cd9..5ed73c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala @@ -98,7 +98,7 @@ class OrcSerializer(dataSchema: StructType) { } - case LongType | _: DayTimeIntervalType => + case LongType | _: DayTimeIntervalType | _: TimestampNTZType => if (reuseObj) { val result = new LongWritable() (getter, ordinal) => @@ -147,8 +147,6 @@ class OrcSerializer(dataSchema: StructType) { result.setNanos(ts.getNanos) result - case TimestampNTZType => (getter, ordinal) => OrcUtils.toOrcNTZ(getter.getLong(ordinal)) - case DecimalType.Fixed(precision, scale) => OrcShimUtils.getHiveDecimalWritable(precision, scale) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index 1f05117..a68ce1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources.orc import java.nio.charset.StandardCharsets.UTF_8 -import java.sql.Timestamp import java.util.Locale import scala.collection.JavaConverters._ @@ -29,7 +28,6 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.serde2.io.DateWritable import org.apache.hadoop.io.{BooleanWritable, ByteWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, ShortWritable, WritableComparable} import org.apache.orc.{BooleanColumnStatistics, ColumnStatistics, DateColumnStatistics, DoubleColumnStatistics, IntegerColumnStatistics, OrcConf, OrcFile, Reader, TypeDescription, Writer} -import org.apache.orc.mapred.OrcTimestamp import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.deploy.SparkHadoopUtil @@ -39,8 +37,8 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution import org.apache.spark.sql.catalyst.expressions.JoinedRow import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils, DateTimeUtils} -import org.apache.spark.sql.catalyst.util.DateTimeConstants._ +import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, SchemaMergeUtils} @@ -199,7 +197,7 @@ object OrcUtils extends Logging { isCaseSensitive: Boolean, dataSchema: StructType, requiredSchema: StructType, - reader: Reader, + orcSchema: TypeDescription, conf: Configuration): Option[(Array[Int], Boolean)] = { def checkTimestampCompatibility(orcCatalystSchema: StructType, dataSchema: StructType): Unit = { orcCatalystSchema.fields.map(_.dataType).zip(dataSchema.fields.map(_.dataType)).foreach { @@ -212,7 +210,6 @@ object OrcUtils extends Logging { } } - val orcSchema = reader.getSchema checkTimestampCompatibility(toCatalystSchema(orcSchema), dataSchema) val orcFieldNames = orcSchema.getFieldNames.asScala val forcePositionalEvolution = OrcConf.FORCE_POSITIONAL_EVOLUTION.getBoolean(conf) @@ -261,7 +258,6 @@ object OrcUtils extends Logging { if (matchedOrcFields.size > 1) { // Need to fail if there is ambiguity, i.e. more than one field is matched. val matchedOrcFieldsString = matchedOrcFields.mkString("[", ", ", "]") - reader.close() throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError( requiredFieldName, matchedOrcFieldsString) } else { @@ -285,18 +281,17 @@ object OrcUtils extends Logging { * Given a `StructType` object, this methods converts it to corresponding string representation * in ORC. */ - def orcTypeDescriptionString(dt: DataType): String = dt match { + def getOrcSchemaString(dt: DataType): String = dt match { case s: StructType => val fieldTypes = s.fields.map { f => - s"${quoteIdentifier(f.name)}:${orcTypeDescriptionString(f.dataType)}" + s"${quoteIdentifier(f.name)}:${getOrcSchemaString(f.dataType)}" } s"struct<${fieldTypes.mkString(",")}>" case a: ArrayType => - s"array<${orcTypeDescriptionString(a.elementType)}>" + s"array<${getOrcSchemaString(a.elementType)}>" case m: MapType => - s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>" - case TimestampNTZType => TypeDescription.Category.TIMESTAMP.getName - case _: DayTimeIntervalType => LongType.catalogString + s"map<${getOrcSchemaString(m.keyType)},${getOrcSchemaString(m.valueType)}>" + case _: DayTimeIntervalType | _: TimestampNTZType => LongType.catalogString case _: YearMonthIntervalType => IntegerType.catalogString case _ => dt.catalogString } @@ -306,16 +301,14 @@ object OrcUtils extends Logging { dt match { case y: YearMonthIntervalType => val typeDesc = new TypeDescription(TypeDescription.Category.INT) - typeDesc.setAttribute( - CATALYST_TYPE_ATTRIBUTE_NAME, y.typeName) + typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, y.typeName) Some(typeDesc) case d: DayTimeIntervalType => val typeDesc = new TypeDescription(TypeDescription.Category.LONG) - typeDesc.setAttribute( - CATALYST_TYPE_ATTRIBUTE_NAME, d.typeName) + typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, d.typeName) Some(typeDesc) case n: TimestampNTZType => - val typeDesc = new TypeDescription(TypeDescription.Category.TIMESTAMP) + val typeDesc = new TypeDescription(TypeDescription.Category.LONG) typeDesc.setAttribute(CATALYST_TYPE_ATTRIBUTE_NAME, n.typeName) Some(typeDesc) case t: TimestampType => @@ -378,9 +371,9 @@ object OrcUtils extends Logging { partitionSchema: StructType, conf: Configuration): String = { val resultSchemaString = if (canPruneCols) { - OrcUtils.orcTypeDescriptionString(resultSchema) + OrcUtils.getOrcSchemaString(resultSchema) } else { - OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)) + OrcUtils.getOrcSchemaString(StructType(dataSchema.fields ++ partitionSchema.fields)) } OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) resultSchemaString @@ -532,17 +525,4 @@ object OrcUtils extends Logging { resultRow } } - - def fromOrcNTZ(ts: Timestamp): Long = { - DateTimeUtils.millisToMicros(ts.getTime) + - (ts.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS - } - - def toOrcNTZ(micros: Long): OrcTimestamp = { - val seconds = Math.floorDiv(micros, MICROS_PER_SECOND) - val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS - val result = new OrcTimestamp(seconds * MILLIS_PER_SECOND) - result.setNanos(nanos.toInt) - result - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 63ad5ed..a955dd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -358,6 +358,8 @@ private[parquet] class ParquetRowConverter( case StringType => new ParquetStringConverter(updater) + // As long as the parquet type is INT64 timestamp, whether logical annotation + // `isAdjustedToUTC` is false or true, it will be read as Spark's TimestampLTZ type case TimestampType if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] && parquetType.getLogicalTypeAnnotation @@ -368,6 +370,8 @@ private[parquet] class ParquetRowConverter( } } + // As long as the parquet type is INT64 timestamp, whether logical annotation + // `isAdjustedToUTC` is false or true, it will be read as Spark's TimestampLTZ type case TimestampType if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] && parquetType.getLogicalTypeAnnotation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index ec6a3bb..ef13bea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -88,11 +88,9 @@ case class OrcPartitionReaderFactory( } val filePath = new Path(new URI(file.filePath)) - val resultedColPruneInfo = - Utils.tryWithResource(createORCReader(filePath, conf)) { reader => - OrcUtils.requestedColumnIds( - isCaseSensitive, dataSchema, readDataSchema, reader, conf) - } + val orcSchema = Utils.tryWithResource(createORCReader(filePath, conf))(_.getSchema) + val resultedColPruneInfo = OrcUtils.requestedColumnIds( + isCaseSensitive, dataSchema, readDataSchema, orcSchema, conf) if (resultedColPruneInfo.isEmpty) { new EmptyPartitionReader[InternalRow] @@ -131,11 +129,9 @@ case class OrcPartitionReaderFactory( } val filePath = new Path(new URI(file.filePath)) - val resultedColPruneInfo = - Utils.tryWithResource(createORCReader(filePath, conf)) { reader => - OrcUtils.requestedColumnIds( - isCaseSensitive, dataSchema, readDataSchema, reader, conf) - } + val orcSchema = Utils.tryWithResource(createORCReader(filePath, conf))(_.getSchema) + val resultedColPruneInfo = OrcUtils.requestedColumnIds( + isCaseSensitive, dataSchema, readDataSchema, orcSchema, conf) if (resultedColPruneInfo.isEmpty) { new EmptyPartitionReader diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala index 1ac9266e..63c20ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala @@ -43,7 +43,7 @@ case class OrcWrite( val conf = job.getConfiguration - conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcUtils.orcTypeDescriptionString(dataSchema)) + conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcUtils.getOrcSchemaString(dataSchema)) conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 49b7cfa..f093a5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -35,6 +35,7 @@ import org.apache.orc.mapreduce.OrcInputFormat import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -803,6 +804,31 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { } } } + + test("SPARK-37463: read/write Timestamp ntz to Orc with different time zone") { + DateTimeTestUtils.withDefaultTimeZone(DateTimeTestUtils.LA) { + val sqlText = """ + |select + | timestamp_ntz '2021-06-01 00:00:00' ts_ntz1, + | timestamp_ntz '1883-11-16 00:00:00.0' as ts_ntz2, + | timestamp_ntz '2021-03-14 02:15:00.0' as ts_ntz3 + |""".stripMargin + + val df = sql(sqlText) + + df.write.mode("overwrite").orc("ts_ntz_orc") + + val query = "select * from `orc`.`ts_ntz_orc`" + + DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => + DateTimeTestUtils.withDefaultTimeZone(zoneId) { + withAllNativeOrcReaders { + checkAnswer(sql(query), df) + } + } + } + } + } } class OrcV1QuerySuite extends OrcQuerySuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org