This is an automated email from the ASF dual-hosted git repository. huaxingao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 44e2657f3d5 [SPARK-39633][SQL] Support timestamp in seconds for TimeTravel using Dataframe options 44e2657f3d5 is described below commit 44e2657f3d511c25135c95dc3d584c540d227b5b Author: Prashant Singh <psing...@amazon.com> AuthorDate: Thu Jun 30 17:16:32 2022 -0700 [SPARK-39633][SQL] Support timestamp in seconds for TimeTravel using Dataframe options ### What changes were proposed in this pull request? Support timestamp in seconds for TimeTravel using Dataframe options ### Why are the changes needed? To have a parity in doing TimeTravel via SQL and Dataframe option. SPARK-SQL supports queries like : ```sql SELECT * from {table} TIMESTAMP AS OF 1548751078 ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added new UTs for testing the behaviour. Closes #37025 from singhpk234/fix/timetravel_df_options. Authored-by: Prashant Singh <psing...@amazon.com> Signed-off-by: huaxingao <huaxin_...@apple.com> --- .../sql/execution/datasources/v2/DataSourceV2Utils.scala | 12 ++++++++++-- .../apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 11 +++++++++++ .../spark/sql/connector/SupportsCatalogOptionsSuite.scala | 7 +++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index f69a2a45886..7fd61c44fd1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SessionConfigSuppo import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{LongType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap private[sql] object DataSourceV2Utils extends Logging { @@ -124,7 +124,15 @@ private[sql] object DataSourceV2Utils extends Logging { val timestamp = hasCatalog.extractTimeTravelTimestamp(dsOptions) val timeTravelVersion = if (version.isPresent) Some(version.get) else None - val timeTravelTimestamp = if (timestamp.isPresent) Some(Literal(timestamp.get)) else None + val timeTravelTimestamp = if (timestamp.isPresent) { + if (timestamp.get.forall(_.isDigit)) { + Some(Literal(timestamp.get.toLong, LongType)) + } else { + Some(Literal(timestamp.get)) + } + } else { + None + } val timeTravel = TimeTravelSpec.create(timeTravelTimestamp, timeTravelVersion, conf) (CatalogV2Util.loadTable(catalog, ident, timeTravel).get, Some(catalog), Some(ident)) case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 9c92c1d9a0b..c82d875faa7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -21,6 +21,7 @@ import java.sql.Timestamp import java.time.{Duration, LocalDate, Period} import scala.collection.JavaConverters._ +import scala.concurrent.duration.MICROSECONDS import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow @@ -2591,6 +2592,8 @@ class DataSourceV2SQLSuite val ts2 = DateTimeUtils.stringToTimestampAnsi( UTF8String.fromString("2021-01-29 00:00:00"), DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)) + val ts1InSeconds = MICROSECONDS.toSeconds(ts1).toString + val ts2InSeconds = MICROSECONDS.toSeconds(ts2).toString val t3 = s"testcat.t$ts1" val t4 = s"testcat.t$ts2" @@ -2607,6 +2610,14 @@ class DataSourceV2SQLSuite === Array(Row(5), Row(6))) assert(sql("SELECT * FROM t TIMESTAMP AS OF '2021-01-29 00:00:00'").collect === Array(Row(7), Row(8))) + assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts1InSeconds").collect + === Array(Row(5), Row(6))) + assert(sql(s"SELECT * FROM t TIMESTAMP AS OF $ts2InSeconds").collect + === Array(Row(7), Row(8))) + assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts1InSeconds").collect + === Array(Row(5), Row(6))) + assert(sql(s"SELECT * FROM t FOR SYSTEM_TIME AS OF $ts2InSeconds").collect + === Array(Row(7), Row(8))) assert(sql("SELECT * FROM t TIMESTAMP AS OF make_date(2021, 1, 29)").collect === Array(Row(7), Row(8))) assert(sql("SELECT * FROM t TIMESTAMP AS OF to_timestamp('2021-01-29 00:00:00')").collect diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala index fc843a86aa5..f8278d18b0a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.connector import java.util.Optional +import scala.concurrent.duration.MICROSECONDS import scala.language.implicitConversions import scala.util.Try @@ -322,6 +323,12 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with timestamp = Some("2019-01-29 00:37:58")), df3.toDF()) checkAnswer(load("t", Some(catalogName), version = None, timestamp = Some("2021-01-29 00:37:58")), df4.toDF()) + + // load with timestamp in number format + checkAnswer(load("t", Some(catalogName), version = None, + timestamp = Some(MICROSECONDS.toSeconds(ts1).toString)), df3.toDF()) + checkAnswer(load("t", Some(catalogName), version = None, + timestamp = Some(MICROSECONDS.toSeconds(ts2).toString)), df4.toDF()) } val e = intercept[AnalysisException] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org