Repository: spark Updated Branches: refs/heads/master b90bfe3c4 -> 47d84e4d0
[SPARK-22814][SQL] Support Date/Timestamp in a JDBC partition column ## What changes were proposed in this pull request? This pr supported Date/Timestamp in a JDBC partition column (a numeric column is only supported in the master). This pr also modified code to verify a partition column type; ``` val jdbcTable = spark.read .option("partitionColumn", "text") .option("lowerBound", "aaa") .option("upperBound", "zzz") .option("numPartitions", 2) .jdbc("jdbc:postgresql:postgres", "t", options) // with this pr org.apache.spark.sql.AnalysisException: Partition column type should be numeric, date, or timestamp, but string found.; at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.verifyAndGetNormalizedPartitionColumn(JDBCRelation.scala:165) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.columnPartition(JDBCRelation.scala:85) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:317) // without this pr java.lang.NumberFormatException: For input string: "aaa" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277) ``` Closes #19999 ## How was this patch tested? Added tests in `JDBCSuite`. Author: Takeshi Yamamuro <yamam...@apache.org> Closes #21834 from maropu/SPARK-22814. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47d84e4d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47d84e4d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47d84e4d Branch: refs/heads/master Commit: 47d84e4d0e56e14f9402770dceaf0b4302c00e98 Parents: b90bfe3 Author: Takeshi Yamamuro <yamam...@apache.org> Authored: Mon Jul 30 07:42:00 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Mon Jul 30 07:42:00 2018 -0700 ---------------------------------------------------------------------- docs/sql-programming-guide.md | 4 +- .../spark/sql/jdbc/OracleIntegrationSuite.scala | 86 +++++++++++++-- .../spark/sql/catalyst/util/DateTimeUtils.scala | 10 +- .../datasources/PartitioningUtils.scala | 2 +- .../datasources/jdbc/JDBCOptions.scala | 4 +- .../datasources/jdbc/JDBCRelation.scala | 107 +++++++++++++++---- .../datasources/jdbc/JdbcRelationProvider.scala | 21 +--- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 77 ++++++++++++- 8 files changed, 258 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/docs/sql-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 4b013c6..cff521c 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1345,8 +1345,8 @@ the following case-insensitive options: These options must all be specified if any of them is specified. In addition, <code>numPartitions</code> must be specified. They describe how to partition the table when reading in parallel from multiple workers. - <code>partitionColumn</code> must be a numeric column from the table in question. Notice - that <code>lowerBound</code> and <code>upperBound</code> are just used to decide the + <code>partitionColumn</code> must be a numeric, date, or timestamp column from the table in question. + Notice that <code>lowerBound</code> and <code>upperBound</code> are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading. </td> http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala ---------------------------------------------------------------------- diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index 8512496..09a2cd8 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -17,12 +17,14 @@ package org.apache.spark.sql.jdbc +import java.math.BigDecimal import java.sql.{Connection, Date, Timestamp} import java.util.{Properties, TimeZone} -import java.math.BigDecimal -import org.apache.spark.sql.{DataFrame, QueryTest, Row, SaveMode} +import org.apache.spark.sql.{Row, SaveMode} import org.apache.spark.sql.execution.{RowDataSourceScanExec, WholeStageCodegenExec} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCRelation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -86,7 +88,8 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo conn.prepareStatement( "CREATE TABLE tableWithCustomSchema (id NUMBER, n1 NUMBER(1), n2 NUMBER(1))").executeUpdate() conn.prepareStatement( - "INSERT INTO tableWithCustomSchema values(12312321321321312312312312123, 1, 0)").executeUpdate() + "INSERT INTO tableWithCustomSchema values(12312321321321312312312312123, 1, 0)") + .executeUpdate() conn.commit() sql( @@ -108,15 +111,36 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo """.stripMargin.replaceAll("\n", " ")) - conn.prepareStatement("CREATE TABLE numerics (b DECIMAL(1), f DECIMAL(3, 2), i DECIMAL(10))").executeUpdate() + conn.prepareStatement("CREATE TABLE numerics (b DECIMAL(1), f DECIMAL(3, 2), i DECIMAL(10))") + .executeUpdate() conn.prepareStatement( "INSERT INTO numerics VALUES (4, 1.23, 9999999999)").executeUpdate() conn.commit() - conn.prepareStatement("CREATE TABLE oracle_types (d BINARY_DOUBLE, f BINARY_FLOAT)").executeUpdate() + conn.prepareStatement("CREATE TABLE oracle_types (d BINARY_DOUBLE, f BINARY_FLOAT)") + .executeUpdate() conn.commit() - } + conn.prepareStatement("CREATE TABLE datetimePartitionTest (id NUMBER(10), d DATE, t TIMESTAMP)") + .executeUpdate() + conn.prepareStatement( + """INSERT INTO datetimePartitionTest VALUES + |(1, {d '2018-07-06'}, {ts '2018-07-06 05:50:00'}) + """.stripMargin.replaceAll("\n", " ")).executeUpdate() + conn.prepareStatement( + """INSERT INTO datetimePartitionTest VALUES + |(2, {d '2018-07-06'}, {ts '2018-07-06 08:10:08'}) + """.stripMargin.replaceAll("\n", " ")).executeUpdate() + conn.prepareStatement( + """INSERT INTO datetimePartitionTest VALUES + |(3, {d '2018-07-08'}, {ts '2018-07-08 13:32:01'}) + """.stripMargin.replaceAll("\n", " ")).executeUpdate() + conn.prepareStatement( + """INSERT INTO datetimePartitionTest VALUES + |(4, {d '2018-07-12'}, {ts '2018-07-12 09:51:15'}) + """.stripMargin.replaceAll("\n", " ")).executeUpdate() + conn.commit() + } test("SPARK-16625 : Importing Oracle numeric types") { val df = sqlContext.read.jdbc(jdbcUrl, "numerics", new Properties) @@ -399,4 +423,54 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo assert(values.getDouble(0) === 1.1) assert(values.getFloat(1) === 2.2f) } + + test("SPARK-22814 support date/timestamp types in partitionColumn") { + val expectedResult = Set( + (1, "2018-07-06", "2018-07-06 05:50:00"), + (2, "2018-07-06", "2018-07-06 08:10:08"), + (3, "2018-07-08", "2018-07-08 13:32:01"), + (4, "2018-07-12", "2018-07-12 09:51:15") + ).map { case (id, date, timestamp) => + Row(BigDecimal.valueOf(id), Date.valueOf(date), Timestamp.valueOf(timestamp)) + } + + // DateType partition column + val df1 = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "datetimePartitionTest") + .option("partitionColumn", "d") + .option("lowerBound", "2018-07-06") + .option("upperBound", "2018-07-20") + .option("numPartitions", 3) + .load() + + df1.logicalPlan match { + case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) => + val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet + assert(whereClauses === Set( + """"D" < '2018-07-10' or "D" is null""", + """"D" >= '2018-07-10' AND "D" < '2018-07-14'""", + """"D" >= '2018-07-14'""")) + } + assert(df1.collect.toSet === expectedResult) + + // TimestampType partition column + val df2 = spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "datetimePartitionTest") + .option("partitionColumn", "t") + .option("lowerBound", "2018-07-04 03:30:00.0") + .option("upperBound", "2018-07-27 14:11:05.0") + .option("numPartitions", 2) + .load() + + df2.logicalPlan match { + case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) => + val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet + assert(whereClauses === Set( + """"T" < '2018-07-15 20:50:32.5' or "T" is null""", + """"T" >= '2018-07-15 20:50:32.5'""")) + } + assert(df2.collect.toSet === expectedResult) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 80f1505..02813d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -96,9 +96,9 @@ object DateTimeUtils { } } - def getThreadLocalDateFormat(): DateFormat = { + def getThreadLocalDateFormat(timeZone: TimeZone): DateFormat = { val sdf = threadLocalDateFormat.get() - sdf.setTimeZone(defaultTimeZone()) + sdf.setTimeZone(timeZone) sdf } @@ -144,7 +144,11 @@ object DateTimeUtils { } def dateToString(days: SQLDate): String = - getThreadLocalDateFormat.format(toJavaDate(days)) + getThreadLocalDateFormat(defaultTimeZone()).format(toJavaDate(days)) + + def dateToString(days: SQLDate, timeZone: TimeZone): String = { + getThreadLocalDateFormat(timeZone).format(toJavaDate(days)) + } // Converts Timestamp to string according to Hive TimestampWritable convention. def timestampToString(us: SQLTimestamp): String = { http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index f9a2480..c8a5f98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -410,7 +410,7 @@ object PartitioningUtils { val dateTry = Try { // try and parse the date, if no exception occurs this is a candidate to be resolved as // DateType - DateTimeUtils.getThreadLocalDateFormat.parse(raw) + DateTimeUtils.getThreadLocalDateFormat(DateTimeUtils.defaultTimeZone()).parse(raw) // SPARK-23436: Casting the string to date may still return null if a bad Date is provided. // This can happen since DateFormat.parse may not use the entire text of the given string: // so if there are extra-characters after the date, it returns correctly. http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index d80efce..7dfbb9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -119,9 +119,9 @@ class JDBCOptions( // the column used to partition val partitionColumn = parameters.get(JDBC_PARTITION_COLUMN) // the lower bound of partition column - val lowerBound = parameters.get(JDBC_LOWER_BOUND).map(_.toLong) + val lowerBound = parameters.get(JDBC_LOWER_BOUND) // the upper bound of the partition column - val upperBound = parameters.get(JDBC_UPPER_BOUND).map(_.toLong) + val upperBound = parameters.get(JDBC_UPPER_BOUND) // numPartitions is also used for data source writing require((partitionColumn.isEmpty && lowerBound.isEmpty && upperBound.isEmpty) || (partitionColumn.isDefined && lowerBound.isDefined && upperBound.isDefined && http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 4f78f59..f150144 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.{Date, Timestamp} + import scala.collection.mutable.ArrayBuffer import org.apache.spark.Partition @@ -24,9 +26,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, DateType, NumericType, StructType, TimestampType} import org.apache.spark.util.Utils /** @@ -34,6 +37,7 @@ import org.apache.spark.util.Utils */ private[sql] case class JDBCPartitioningInfo( column: String, + columnType: DataType, lowerBound: Long, upperBound: Long, numPartitions: Int) @@ -51,16 +55,43 @@ private[sql] object JDBCRelation extends Logging { * the rows with null value for the partitions column. * * @param schema resolved schema of a JDBC table - * @param partitioning partition information to generate the where clause for each partition * @param resolver function used to determine if two identifiers are equal + * @param timeZoneId timezone ID to be used if a partition column type is date or timestamp * @param jdbcOptions JDBC options that contains url * @return an array of partitions with where clause for each partition */ def columnPartition( schema: StructType, - partitioning: JDBCPartitioningInfo, resolver: Resolver, + timeZoneId: String, jdbcOptions: JDBCOptions): Array[Partition] = { + val partitioning = { + import JDBCOptions._ + + val partitionColumn = jdbcOptions.partitionColumn + val lowerBound = jdbcOptions.lowerBound + val upperBound = jdbcOptions.upperBound + val numPartitions = jdbcOptions.numPartitions + + if (partitionColumn.isEmpty) { + assert(lowerBound.isEmpty && upperBound.isEmpty, "When 'partitionColumn' is not " + + s"specified, '$JDBC_LOWER_BOUND' and '$JDBC_UPPER_BOUND' are expected to be empty") + null + } else { + assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty, + s"When 'partitionColumn' is specified, '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', and " + + s"'$JDBC_NUM_PARTITIONS' are also required") + + val (column, columnType) = verifyAndGetNormalizedPartitionColumn( + schema, partitionColumn.get, resolver, jdbcOptions) + + val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType) + val upperBoundValue = toInternalBoundValue(upperBound.get, columnType) + JDBCPartitioningInfo( + column, columnType, lowerBoundValue, upperBoundValue, numPartitions.get) + } + } + if (partitioning == null || partitioning.numPartitions <= 1 || partitioning.lowerBound == partitioning.upperBound) { return Array[Partition](JDBCPartition(null, 0)) @@ -72,6 +103,8 @@ private[sql] object JDBCRelation extends Logging { "Operation not allowed: the lower bound of partitioning column is larger than the upper " + s"bound. Lower bound: $lowerBound; Upper bound: $upperBound") + val boundValueToString: Long => String = + toBoundValueInWhereClause(_, partitioning.columnType, timeZoneId) val numPartitions = if ((upperBound - lowerBound) >= partitioning.numPartitions || /* check for overflow */ (upperBound - lowerBound) < 0) { @@ -80,24 +113,25 @@ private[sql] object JDBCRelation extends Logging { logWarning("The number of partitions is reduced because the specified number of " + "partitions is less than the difference between upper bound and lower bound. " + s"Updated number of partitions: ${upperBound - lowerBound}; Input number of " + - s"partitions: ${partitioning.numPartitions}; Lower bound: $lowerBound; " + - s"Upper bound: $upperBound.") + s"partitions: ${partitioning.numPartitions}; " + + s"Lower bound: ${boundValueToString(lowerBound)}; " + + s"Upper bound: ${boundValueToString(upperBound)}.") upperBound - lowerBound } // Overflow and silliness can happen if you subtract then divide. // Here we get a little roundoff, but that's (hopefully) OK. val stride: Long = upperBound / numPartitions - lowerBound / numPartitions - val column = verifyAndGetNormalizedColumnName( - schema, partitioning.column, resolver, jdbcOptions) - var i: Int = 0 - var currentValue: Long = lowerBound + val column = partitioning.column + var currentValue = lowerBound val ans = new ArrayBuffer[Partition]() while (i < numPartitions) { - val lBound = if (i != 0) s"$column >= $currentValue" else null + val lBoundValue = boundValueToString(currentValue) + val lBound = if (i != 0) s"$column >= $lBoundValue" else null currentValue += stride - val uBound = if (i != numPartitions - 1) s"$column < $currentValue" else null + val uBoundValue = boundValueToString(currentValue) + val uBound = if (i != numPartitions - 1) s"$column < $uBoundValue" else null val whereClause = if (uBound == null) { lBound @@ -109,23 +143,58 @@ private[sql] object JDBCRelation extends Logging { ans += JDBCPartition(whereClause, i) i = i + 1 } - ans.toArray + val partitions = ans.toArray + logInfo(s"Number of partitions: $numPartitions, WHERE clauses of these partitions: " + + partitions.map(_.asInstanceOf[JDBCPartition].whereClause).mkString(", ")) + partitions } - // Verify column name based on the JDBC resolved schema - private def verifyAndGetNormalizedColumnName( + // Verify column name and type based on the JDBC resolved schema + private def verifyAndGetNormalizedPartitionColumn( schema: StructType, columnName: String, resolver: Resolver, - jdbcOptions: JDBCOptions): String = { + jdbcOptions: JDBCOptions): (String, DataType) = { val dialect = JdbcDialects.get(jdbcOptions.url) - schema.map(_.name).find { fieldName => - resolver(fieldName, columnName) || - resolver(dialect.quoteIdentifier(fieldName), columnName) - }.map(dialect.quoteIdentifier).getOrElse { + val column = schema.find { f => + resolver(f.name, columnName) || resolver(dialect.quoteIdentifier(f.name), columnName) + }.getOrElse { throw new AnalysisException(s"User-defined partition column $columnName not " + s"found in the JDBC relation: ${schema.simpleString(Utils.maxNumToStringFields)}") } + column.dataType match { + case _: NumericType | DateType | TimestampType => + case _ => + throw new AnalysisException( + s"Partition column type should be ${NumericType.simpleString}, " + + s"${DateType.catalogString}, or ${TimestampType.catalogString}, but " + + s"${column.dataType.catalogString} found.") + } + (dialect.quoteIdentifier(column.name), column.dataType) + } + + private def toInternalBoundValue(value: String, columnType: DataType): Long = columnType match { + case _: NumericType => value.toLong + case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value)).toLong + case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value)) + } + + private def toBoundValueInWhereClause( + value: Long, + columnType: DataType, + timeZoneId: String): String = { + def dateTimeToString(): String = { + val timeZone = DateTimeUtils.getTimeZone(timeZoneId) + val dateTimeStr = columnType match { + case DateType => DateTimeUtils.dateToString(value.toInt, timeZone) + case TimestampType => DateTimeUtils.timestampToString(value, timeZone) + } + s"'$dateTimeStr'" + } + columnType match { + case _: NumericType => value.toString + case DateType | TimestampType => dateTimeToString() + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 782d626..e7456f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -29,28 +29,11 @@ class JdbcRelationProvider extends CreatableRelationProvider override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { - import JDBCOptions._ - val jdbcOptions = new JDBCOptions(parameters) - val partitionColumn = jdbcOptions.partitionColumn - val lowerBound = jdbcOptions.lowerBound - val upperBound = jdbcOptions.upperBound - val numPartitions = jdbcOptions.numPartitions - - val partitionInfo = if (partitionColumn.isEmpty) { - assert(lowerBound.isEmpty && upperBound.isEmpty, "When 'partitionColumn' is not specified, " + - s"'$JDBC_LOWER_BOUND' and '$JDBC_UPPER_BOUND' are expected to be empty") - null - } else { - assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty, - s"When 'partitionColumn' is specified, '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', and " + - s"'$JDBC_NUM_PARTITIONS' are also required") - JDBCPartitioningInfo( - partitionColumn.get, lowerBound.get, upperBound.get, numPartitions.get) - } val resolver = sqlContext.conf.resolver + val timeZoneId = sqlContext.conf.sessionLocalTimeZone val schema = JDBCRelation.getSchema(resolver, jdbcOptions) - val parts = JDBCRelation.columnPartition(schema, partitionInfo, resolver, jdbcOptions) + val parts = JDBCRelation.columnPartition(schema, resolver, timeZoneId, jdbcOptions) JDBCRelation(schema, parts, jdbcOptions)(sqlContext.sparkSession) } http://git-wip-us.apache.org/repos/asf/spark/blob/47d84e4d/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 0edbd3a..7fa0e7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -24,7 +24,7 @@ import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException import org.scalatest.{BeforeAndAfter, PrivateMethodTester} -import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -244,6 +244,17 @@ class JDBCSuite extends QueryTest .executeUpdate() conn.commit() + conn.prepareStatement("CREATE TABLE test.datetime (d DATE, t TIMESTAMP)").executeUpdate() + conn.prepareStatement( + "INSERT INTO test.datetime VALUES ('2018-07-06', '2018-07-06 05:50:00.0')").executeUpdate() + conn.prepareStatement( + "INSERT INTO test.datetime VALUES ('2018-07-06', '2018-07-06 08:10:08.0')").executeUpdate() + conn.prepareStatement( + "INSERT INTO test.datetime VALUES ('2018-07-08', '2018-07-08 13:32:01.0')").executeUpdate() + conn.prepareStatement( + "INSERT INTO test.datetime VALUES ('2018-07-12', '2018-07-12 09:51:15.0')").executeUpdate() + conn.commit() + // Untested: IDENTITY, OTHER, UUID, ARRAY, and GEOMETRY types. } @@ -1375,7 +1386,71 @@ class JDBCSuite extends QueryTest checkAnswer( sql("select name, theid from queryOption"), Row("fred", 1) :: Nil) + } + + test("SPARK-22814 support date/timestamp types in partitionColumn") { + val expectedResult = Seq( + ("2018-07-06", "2018-07-06 05:50:00.0"), + ("2018-07-06", "2018-07-06 08:10:08.0"), + ("2018-07-08", "2018-07-08 13:32:01.0"), + ("2018-07-12", "2018-07-12 09:51:15.0") + ).map { case (date, timestamp) => + Row(Date.valueOf(date), Timestamp.valueOf(timestamp)) + } + + // DateType partition column + val df1 = spark.read.format("jdbc") + .option("url", urlWithUserAndPass) + .option("dbtable", "TEST.DATETIME") + .option("partitionColumn", "d") + .option("lowerBound", "2018-07-06") + .option("upperBound", "2018-07-20") + .option("numPartitions", 3) + .load() + df1.logicalPlan match { + case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) => + val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet + assert(whereClauses === Set( + """"D" < '2018-07-10' or "D" is null""", + """"D" >= '2018-07-10' AND "D" < '2018-07-14'""", + """"D" >= '2018-07-14'""")) + } + checkAnswer(df1, expectedResult) + + // TimestampType partition column + val df2 = spark.read.format("jdbc") + .option("url", urlWithUserAndPass) + .option("dbtable", "TEST.DATETIME") + .option("partitionColumn", "t") + .option("lowerBound", "2018-07-04 03:30:00.0") + .option("upperBound", "2018-07-27 14:11:05.0") + .option("numPartitions", 2) + .load() + + df2.logicalPlan match { + case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) => + val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet + assert(whereClauses === Set( + """"T" < '2018-07-15 20:50:32.5' or "T" is null""", + """"T" >= '2018-07-15 20:50:32.5'""")) + } + checkAnswer(df2, expectedResult) + } + + test("throws an exception for unsupported partition column types") { + val errMsg = intercept[AnalysisException] { + spark.read.format("jdbc") + .option("url", urlWithUserAndPass) + .option("dbtable", "TEST.PEOPLE") + .option("partitionColumn", "name") + .option("lowerBound", "aaa") + .option("upperBound", "zzz") + .option("numPartitions", 2) + .load() + }.getMessage + assert(errMsg.contains( + "Partition column type should be numeric, date, or timestamp, but string found.")) } test("SPARK-24288: Enable preventing predicate pushdown") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org