spark git commit: [SPARK-4613][Core] Java API for JdbcRDD
Repository: spark Updated Branches: refs/heads/branch-1.2 bfba8bf60 -> 092800435 [SPARK-4613][Core] Java API for JdbcRDD This PR introduces a set of Java APIs for using `JdbcRDD`: 1. Trait (interface) `JdbcRDD.ConnectionFactory`: equivalent to the `getConnection: () => Connection` parameter in `JdbcRDD` constructor. 2. Two overloaded versions of `Jdbc.create`: used to create `JavaRDD` that wraps a `JdbcRDD`. [https://reviewable.io/review_button.png"; height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3478) Author: Cheng Lian Closes #3478 from liancheng/japi-jdbc-rdd and squashes the following commits: 9a54625 [Cheng Lian] Only shutdowns a single DB rather than the whole Derby driver d4cedc5 [Cheng Lian] Moves Java JdbcRDD test case to a separate test suite ffcdf2e [Cheng Lian] Java API for JdbcRDD (cherry picked from commit 120a350240f58196eafcb038ca3a353636d89239) Signed-off-by: Matei Zaharia Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/09280043 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/09280043 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/09280043 Branch: refs/heads/branch-1.2 Commit: 092800435c27c97bf445de934826a131dfba Parents: bfba8bf Author: Cheng Lian Authored: Thu Nov 27 18:01:14 2014 -0800 Committer: Matei Zaharia Committed: Thu Nov 27 18:01:26 2014 -0800 -- .../scala/org/apache/spark/rdd/JdbcRDD.scala| 84 - .../java/org/apache/spark/JavaJdbcRDDSuite.java | 118 +++ .../org/apache/spark/rdd/JdbcRDDSuite.scala | 7 +- 3 files changed, 204 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/09280043/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index 0e38f22..642a12c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -21,8 +21,11 @@ import java.sql.{Connection, ResultSet} import scala.reflect.ClassTag -import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} +import org.apache.spark.api.java.JavaSparkContext.fakeClassTag +import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.util.NextIterator +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition { override def index = idx @@ -125,5 +128,82 @@ object JdbcRDD { def resultSetToObjectArray(rs: ResultSet): Array[Object] = { Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1)) } -} + trait ConnectionFactory extends Serializable { +@throws[Exception] +def getConnection: Connection + } + + /** + * Create an RDD that executes an SQL query on a JDBC connection and reads results. + * For usage example, see test case JavaAPISuite.testJavaJdbcRDD. + * + * @param connectionFactory a factory that returns an open Connection. + * The RDD takes care of closing the connection. + * @param sql the text of the query. + * The query must contain two ? placeholders for parameters used to partition the results. + * E.g. "select title, author from books where ? <= id and id <= ?" + * @param lowerBound the minimum value of the first placeholder + * @param upperBound the maximum value of the second placeholder + * The lower and upper bounds are inclusive. + * @param numPartitions the number of partitions. + * Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, + * the query would be executed twice, once with (1, 10) and once with (11, 20) + * @param mapRow a function from a ResultSet to a single row of the desired result type(s). + * This should only call getInt, getString, etc; the RDD takes care of calling next. + * The default maps a ResultSet to an array of Object. + */ + def create[T]( + sc: JavaSparkContext, + connectionFactory: ConnectionFactory, + sql: String, + lowerBound: Long, + upperBound: Long, + numPartitions: Int, + mapRow: JFunction[ResultSet, T]): JavaRDD[T] = { + +val jdbcRDD = new JdbcRDD[T]( + sc.sc, + () => connectionFactory.getConnection, + sql, + lowerBound, + upperBound, + numPartitions, + (resultSet: ResultSet) => mapRow.call(resultSet))(fakeClassTag) + +new JavaRDD[T](jdbcRDD)(fakeClassTag) + } + + /** + * Create an RDD that executes an SQL query on a JDBC connec
spark git commit: [SPARK-4613][Core] Java API for JdbcRDD
Repository: spark Updated Branches: refs/heads/master 84376d313 -> 120a35024 [SPARK-4613][Core] Java API for JdbcRDD This PR introduces a set of Java APIs for using `JdbcRDD`: 1. Trait (interface) `JdbcRDD.ConnectionFactory`: equivalent to the `getConnection: () => Connection` parameter in `JdbcRDD` constructor. 2. Two overloaded versions of `Jdbc.create`: used to create `JavaRDD` that wraps a `JdbcRDD`. [https://reviewable.io/review_button.png"; height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3478) Author: Cheng Lian Closes #3478 from liancheng/japi-jdbc-rdd and squashes the following commits: 9a54625 [Cheng Lian] Only shutdowns a single DB rather than the whole Derby driver d4cedc5 [Cheng Lian] Moves Java JdbcRDD test case to a separate test suite ffcdf2e [Cheng Lian] Java API for JdbcRDD Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/120a3502 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/120a3502 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/120a3502 Branch: refs/heads/master Commit: 120a350240f58196eafcb038ca3a353636d89239 Parents: 84376d3 Author: Cheng Lian Authored: Thu Nov 27 18:01:14 2014 -0800 Committer: Matei Zaharia Committed: Thu Nov 27 18:01:14 2014 -0800 -- .../scala/org/apache/spark/rdd/JdbcRDD.scala| 84 - .../java/org/apache/spark/JavaJdbcRDDSuite.java | 118 +++ .../org/apache/spark/rdd/JdbcRDDSuite.scala | 7 +- 3 files changed, 204 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/120a3502/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index 0e38f22..642a12c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -21,8 +21,11 @@ import java.sql.{Connection, ResultSet} import scala.reflect.ClassTag -import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} +import org.apache.spark.api.java.JavaSparkContext.fakeClassTag +import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.util.NextIterator +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition { override def index = idx @@ -125,5 +128,82 @@ object JdbcRDD { def resultSetToObjectArray(rs: ResultSet): Array[Object] = { Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1)) } -} + trait ConnectionFactory extends Serializable { +@throws[Exception] +def getConnection: Connection + } + + /** + * Create an RDD that executes an SQL query on a JDBC connection and reads results. + * For usage example, see test case JavaAPISuite.testJavaJdbcRDD. + * + * @param connectionFactory a factory that returns an open Connection. + * The RDD takes care of closing the connection. + * @param sql the text of the query. + * The query must contain two ? placeholders for parameters used to partition the results. + * E.g. "select title, author from books where ? <= id and id <= ?" + * @param lowerBound the minimum value of the first placeholder + * @param upperBound the maximum value of the second placeholder + * The lower and upper bounds are inclusive. + * @param numPartitions the number of partitions. + * Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, + * the query would be executed twice, once with (1, 10) and once with (11, 20) + * @param mapRow a function from a ResultSet to a single row of the desired result type(s). + * This should only call getInt, getString, etc; the RDD takes care of calling next. + * The default maps a ResultSet to an array of Object. + */ + def create[T]( + sc: JavaSparkContext, + connectionFactory: ConnectionFactory, + sql: String, + lowerBound: Long, + upperBound: Long, + numPartitions: Int, + mapRow: JFunction[ResultSet, T]): JavaRDD[T] = { + +val jdbcRDD = new JdbcRDD[T]( + sc.sc, + () => connectionFactory.getConnection, + sql, + lowerBound, + upperBound, + numPartitions, + (resultSet: ResultSet) => mapRow.call(resultSet))(fakeClassTag) + +new JavaRDD[T](jdbcRDD)(fakeClassTag) + } + + /** + * Create an RDD that executes an SQL query on a JDBC connection and reads results. Each row is + * converted into a `Object` array. For usage example, see test case