This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 0be6b2e6abb [SPARK-42555][CONNECT] Add JDBC to DataFrameReader 0be6b2e6abb is described below commit 0be6b2e6abb7ef9ce746be71db5f800fb49931a8 Author: Jiaan Geng <belie...@163.com> AuthorDate: Fri Mar 3 22:59:37 2023 -0400 [SPARK-42555][CONNECT] Add JDBC to DataFrameReader ### What changes were proposed in this pull request? Currently, the connect project have the new `DataFrameReader` API which is corresponding to Spark `DataFrameReader` API. But the connect `DataFrameReader` missing the jdbc API. ### Why are the changes needed? This PR try to add JDBC to `DataFrameReader` ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New test cases. Closes #40252 from beliefer/SPARK-42555. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit 41c5e326eb4f45818c8227ab51e729c8402d995d) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../org/apache/spark/sql/DataFrameReader.scala | 66 +++++++++++++++++++++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 20 ++++++- .../query-tests/explain-results/read_jdbc.explain | 1 + .../read_jdbc_with_partition.explain | 1 + .../resources/query-tests/queries/read_jdbc.json | 14 +++++ .../query-tests/queries/read_jdbc.proto.bin | Bin 0 -> 101 bytes .../queries/read_jdbc_with_partition.json | 18 ++++++ .../queries/read_jdbc_with_partition.proto.bin | Bin 0 -> 177 bytes .../sql/connect/ProtoToParsedPlanTestSuite.scala | 33 +++++++++++ 9 files changed, 152 insertions(+), 1 deletion(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 3e17b03173b..43d6486f124 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.util.Properties + import scala.collection.JavaConverters._ import org.apache.spark.annotation.Stable @@ -184,6 +186,70 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging } } + /** + * Construct a `DataFrame` representing the database table accessible via JDBC URL url named + * table and connection properties. + * + * You can find the JDBC-specific option and parameter documentation for reading tables via JDBC + * in <a + * href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option"> + * Data Source Option</a> in the version you use. + * + * @since 3.4.0 + */ + def jdbc(url: String, table: String, properties: Properties): DataFrame = { + // properties should override settings in extraOptions. + this.extraOptions ++= properties.asScala + // explicit url and dbtable should override all + this.extraOptions ++= Seq("url" -> url, "dbtable" -> table) + format("jdbc").load() + } + + // scalastyle:off line.size.limit + /** + * Construct a `DataFrame` representing the database table accessible via JDBC URL url named + * table. Partitions of the table will be retrieved in parallel based on the parameters passed + * to this function. + * + * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash + * your external database systems. + * + * You can find the JDBC-specific option and parameter documentation for reading tables via JDBC + * in <a + * href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option"> + * Data Source Option</a> in the version you use. + * + * @param table + * Name of the table in the external database. + * @param columnName + * Alias of `partitionColumn` option. Refer to `partitionColumn` in <a + * href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option"> + * Data Source Option</a> in the version you use. + * @param connectionProperties + * JDBC database connection arguments, a list of arbitrary string tag/value. Normally at least + * a "user" and "password" property should be included. "fetchsize" can be used to control the + * number of rows per fetch and "queryTimeout" can be used to wait for a Statement object to + * execute to the given number of seconds. + * @since 3.4.0 + */ + // scalastyle:on line.size.limit + def jdbc( + url: String, + table: String, + columnName: String, + lowerBound: Long, + upperBound: Long, + numPartitions: Int, + connectionProperties: Properties): DataFrame = { + // columnName, lowerBound, upperBound and numPartitions override settings in extraOptions. + this.extraOptions ++= Map( + "partitionColumn" -> columnName, + "lowerBound" -> lowerBound.toString, + "upperBound" -> upperBound.toString, + "numPartitions" -> numPartitions.toString) + jdbc(url, table, connectionProperties) + } + /** * Loads a JSON file and returns the results as a `DataFrame`. * diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index 6e9583ae725..e8921ca776d 100755 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql import java.nio.file.{Files, Path} -import java.util.Collections +import java.util.{Collections, Properties} import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable @@ -235,6 +235,24 @@ class PlanGenerationTestSuite .load(testDataPath.resolve("people.csv").toString) } + test("read jdbc") { + session.read.jdbc( + "jdbc:h2:mem:testdb0;user=testUser;password=testPass", + "TEST.TIMETYPES", + new Properties()) + } + + test("read jdbc with partition") { + session.read.jdbc( + "jdbc:h2:mem:testdb0;user=testUser;password=testPass", + "TEST.EMP", + "THEID", + 0, + 4, + 3, + new Properties()) + } + test("read json") { session.read.json(testDataPath.resolve("people.json").toString) } diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc.explain new file mode 100644 index 00000000000..c0e906176b8 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc.explain @@ -0,0 +1 @@ +Relation [A#0,B#0,C#0] JDBCRelation(TEST.TIMETYPES) [numPartitions=1] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc_with_partition.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc_with_partition.explain new file mode 100644 index 00000000000..e3ddb781bd2 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc_with_partition.explain @@ -0,0 +1 @@ +Relation [NAME#0,THEID#0,Dept#0] JDBCRelation(TEST.EMP) [numPartitions=3] diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.json b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.json new file mode 100644 index 00000000000..3e9b7b8cc86 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.json @@ -0,0 +1,14 @@ +{ + "common": { + "planId": "0" + }, + "read": { + "dataSource": { + "format": "jdbc", + "options": { + "url": "jdbc:h2:mem:testdb0;user\u003dtestUser;password\u003dtestPass", + "dbtable": "TEST.TIMETYPES" + } + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.proto.bin new file mode 100644 index 00000000000..4e74a07d22f Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.json b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.json new file mode 100644 index 00000000000..31576cee4f1 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.json @@ -0,0 +1,18 @@ +{ + "common": { + "planId": "0" + }, + "read": { + "dataSource": { + "format": "jdbc", + "options": { + "url": "jdbc:h2:mem:testdb0;user\u003dtestUser;password\u003dtestPass", + "upperbound": "4", + "lowerbound": "0", + "numpartitions": "3", + "dbtable": "TEST.EMP", + "partitioncolumn": "THEID" + } + } + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.proto.bin new file mode 100644 index 00000000000..c74178148de Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_partition.proto.bin differ diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala index f6e5442bd0c..142ae175090 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.connect import java.nio.charset.StandardCharsets import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor} import java.nio.file.attribute.BasicFileAttributes +import java.sql.DriverManager import java.util import scala.util.{Failure, Success, Try} @@ -36,6 +37,7 @@ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils // scalastyle:off /** @@ -57,6 +59,37 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap */ // scalastyle:on class ProtoToParsedPlanTestSuite extends SparkFunSuite with SharedSparkSession { + val url = "jdbc:h2:mem:testdb0" + var conn: java.sql.Connection = null + + override def beforeAll(): Unit = { + super.beforeAll() + + Utils.classForName("org.h2.Driver") + // Extra properties that will be specified for our database. We need these to test + // usage of parameters from OPTIONS clause in queries. + val properties = new util.Properties() + properties.setProperty("user", "testUser") + properties.setProperty("password", "testPass") + + conn = DriverManager.getConnection(url, properties) + conn.prepareStatement("create schema test").executeUpdate() + conn + .prepareStatement("create table test.timetypes (a TIME, b DATE, c TIMESTAMP(7))") + .executeUpdate() + conn + .prepareStatement( + "create table test.emp(name TEXT(32) NOT NULL," + + " theid INTEGER, \"Dept\" INTEGER)") + .executeUpdate() + conn.commit() + } + + override def afterAll(): Unit = { + conn.close() + super.afterAll() + } + override def sparkConf: SparkConf = { super.sparkConf .set( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org