This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2a1267a [SPARK-37261][SQL] Allow adding partitions with ANSI intervals in DSv2 2a1267a is described below commit 2a1267aeb75bf838c74d1cf274aa258be060c17b Author: Max Gekk <max.g...@gmail.com> AuthorDate: Wed Nov 10 15:21:33 2021 +0300 [SPARK-37261][SQL] Allow adding partitions with ANSI intervals in DSv2 ### What changes were proposed in this pull request? In the PR, I propose to skip checking of ANSI interval types while creating or writing to a table using V2 catalogs. As the consequence of that, users can creating tables in V2 catalogs partitioned by ANSI interval columns (the legacy intervals of `CalendarIntervalType` are still prohibited). Also this PR adds new test which checks: 1. Adding new partition with ANSI intervals via `ALTER TABLE .. ADD PARTITION` 2. INSERT INTO a table partitioned by ANSI intervals for V1/V2 In-Memory catalogs (skips V1 Hive external catalog). ### Why are the changes needed? To allow users saving of ANSI intervals as partition values using DSv2. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running new test for V1/V2 In-Memory and V1 Hive external catalogs: ``` $ build/sbt "test:testOnly org.apache.spark.sql.execution.command.v1.AlterTableAddPartitionSuite" $ build/sbt "test:testOnly org.apache.spark.sql.execution.command.v2.AlterTableAddPartitionSuite" $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly org.apache.spark.sql.hive.execution.command.AlterTableAddPartitionSuite" $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *DataSourceV2SQLSuite" ``` Closes #34537 from MaxGekk/alter-table-ansi-interval. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../sql/catalyst/analysis/CheckAnalysis.scala | 6 ++-- .../apache/spark/sql/catalyst/util/TypeUtils.scala | 4 +-- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 16 +++++---- .../command/AlterTableAddPartitionSuiteBase.scala | 40 +++++++++++++++++++++- 4 files changed, 54 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 5bf37a2..1a105ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -464,10 +464,12 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { failAnalysis(s"Invalid partitioning: ${badReferences.mkString(", ")}") } - create.tableSchema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) + create.tableSchema.foreach(f => + TypeUtils.failWithIntervalType(f.dataType, forbidAnsiIntervals = false)) case write: V2WriteCommand if write.resolved => - write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) + write.query.schema.foreach(f => + TypeUtils.failWithIntervalType(f.dataType, forbidAnsiIntervals = false)) case alter: AlterTableCommand => checkAlterTableCommand(alter) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala index cba3a9a..144508c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala @@ -98,8 +98,8 @@ object TypeUtils { case _ => false } - def failWithIntervalType(dataType: DataType): Unit = { - invokeOnceForInterval(dataType, forbidAnsiIntervals = true) { + def failWithIntervalType(dataType: DataType, forbidAnsiIntervals: Boolean = true): Unit = { + invokeOnceForInterval(dataType, forbidAnsiIntervals) { throw QueryCompilationErrors.cannotUseIntervalTypeInTableSchemaError() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index f03792f..499638c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -340,13 +340,15 @@ class DataSourceV2SQLSuite } test("CTAS/RTAS: invalid schema if has interval type") { - Seq("CREATE", "REPLACE").foreach { action => - val e1 = intercept[AnalysisException]( - sql(s"$action TABLE table_name USING $v2Format as select interval 1 day")) - assert(e1.getMessage.contains(s"Cannot use interval type in the table schema.")) - val e2 = intercept[AnalysisException]( - sql(s"$action TABLE table_name USING $v2Format as select array(interval 1 day)")) - assert(e2.getMessage.contains(s"Cannot use interval type in the table schema.")) + withSQLConf(SQLConf.LEGACY_INTERVAL_ENABLED.key -> "true") { + Seq("CREATE", "REPLACE").foreach { action => + val e1 = intercept[AnalysisException]( + sql(s"$action TABLE table_name USING $v2Format as select interval 1 day")) + assert(e1.getMessage.contains(s"Cannot use interval type in the table schema.")) + val e2 = intercept[AnalysisException]( + sql(s"$action TABLE table_name USING $v2Format as select array(interval 1 day)")) + assert(e2.getMessage.contains(s"Cannot use interval type in the table schema.")) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index e2e1591..dee1495 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.{AnalysisException, QueryTest} +import java.time.{Duration, Period} + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException import org.apache.spark.sql.internal.SQLConf @@ -189,4 +191,40 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with DDLCommandTestUtils checkPartitions(t, Map("part" ->"2020-01-01")) } } + + test("SPARK-37261: Add ANSI intervals as partition values") { + assume(!catalogVersion.contains("Hive")) // Hive catalog doesn't support the interval types + + withNamespaceAndTable("ns", "tbl") { t => + sql( + s"""CREATE TABLE $t ( + | ym INTERVAL YEAR, + | dt INTERVAL DAY, + | data STRING) $defaultUsing + |PARTITIONED BY (ym, dt)""".stripMargin) + sql( + s"""ALTER TABLE $t ADD PARTITION ( + | ym = INTERVAL '100' YEAR, + | dt = INTERVAL '10' DAY + |) LOCATION 'loc'""".stripMargin) + + checkPartitions(t, Map("ym" -> "INTERVAL '100' YEAR", "dt" -> "INTERVAL '10' DAY")) + checkLocation(t, Map("ym" -> "INTERVAL '100' YEAR", "dt" -> "INTERVAL '10' DAY"), "loc") + + sql( + s"""INSERT INTO $t PARTITION ( + | ym = INTERVAL '100' YEAR, + | dt = INTERVAL '10' DAY) SELECT 'aaa'""".stripMargin) + sql( + s"""INSERT INTO $t PARTITION ( + | ym = INTERVAL '1' YEAR, + | dt = INTERVAL '-1' DAY) SELECT 'bbb'""".stripMargin) + + checkAnswer( + sql(s"SELECT ym, dt, data FROM $t"), + Seq( + Row(Period.ofYears(100), Duration.ofDays(10), "aaa"), + Row(Period.ofYears(1), Duration.ofDays(-1), "bbb"))) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org