spark git commit: [SPARK-4613][Core] Java API for JdbcRDD

2014-11-27 Thread matei
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 bfba8bf60 -> 092800435


[SPARK-4613][Core] Java API for JdbcRDD

This PR introduces a set of Java APIs for using `JdbcRDD`:

1. Trait (interface) `JdbcRDD.ConnectionFactory`: equivalent to the 
`getConnection: () => Connection` parameter in `JdbcRDD` constructor.
2. Two overloaded versions of `Jdbc.create`: used to create `JavaRDD` that 
wraps a `JdbcRDD`.


[https://reviewable.io/review_button.png"; height=40 alt="Review on 
Reviewable"/>](https://reviewable.io/reviews/apache/spark/3478)


Author: Cheng Lian 

Closes #3478 from liancheng/japi-jdbc-rdd and squashes the following commits:

9a54625 [Cheng Lian] Only shutdowns a single DB rather than the whole Derby 
driver
d4cedc5 [Cheng Lian] Moves Java JdbcRDD test case to a separate test suite
ffcdf2e [Cheng Lian] Java API for JdbcRDD

(cherry picked from commit 120a350240f58196eafcb038ca3a353636d89239)
Signed-off-by: Matei Zaharia 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/09280043
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/09280043
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/09280043

Branch: refs/heads/branch-1.2
Commit: 092800435c27c97bf445de934826a131dfba
Parents: bfba8bf
Author: Cheng Lian 
Authored: Thu Nov 27 18:01:14 2014 -0800
Committer: Matei Zaharia 
Committed: Thu Nov 27 18:01:26 2014 -0800

--
 .../scala/org/apache/spark/rdd/JdbcRDD.scala|  84 -
 .../java/org/apache/spark/JavaJdbcRDDSuite.java | 118 +++
 .../org/apache/spark/rdd/JdbcRDDSuite.scala |   7 +-
 3 files changed, 204 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/09280043/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index 0e38f22..642a12c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -21,8 +21,11 @@ import java.sql.{Connection, ResultSet}
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.util.NextIterator
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 
 private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) 
extends Partition {
   override def index = idx
@@ -125,5 +128,82 @@ object JdbcRDD {
   def resultSetToObjectArray(rs: ResultSet): Array[Object] = {
 Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i 
+ 1))
   }
-}
 
+  trait ConnectionFactory extends Serializable {
+@throws[Exception]
+def getConnection: Connection
+  }
+
+  /**
+   * Create an RDD that executes an SQL query on a JDBC connection and reads 
results.
+   * For usage example, see test case JavaAPISuite.testJavaJdbcRDD.
+   *
+   * @param connectionFactory a factory that returns an open Connection.
+   *   The RDD takes care of closing the connection.
+   * @param sql the text of the query.
+   *   The query must contain two ? placeholders for parameters used to 
partition the results.
+   *   E.g. "select title, author from books where ? <= id and id <= ?"
+   * @param lowerBound the minimum value of the first placeholder
+   * @param upperBound the maximum value of the second placeholder
+   *   The lower and upper bounds are inclusive.
+   * @param numPartitions the number of partitions.
+   *   Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
+   *   the query would be executed twice, once with (1, 10) and once with (11, 
20)
+   * @param mapRow a function from a ResultSet to a single row of the desired 
result type(s).
+   *   This should only call getInt, getString, etc; the RDD takes care of 
calling next.
+   *   The default maps a ResultSet to an array of Object.
+   */
+  def create[T](
+  sc: JavaSparkContext,
+  connectionFactory: ConnectionFactory,
+  sql: String,
+  lowerBound: Long,
+  upperBound: Long,
+  numPartitions: Int,
+  mapRow: JFunction[ResultSet, T]): JavaRDD[T] = {
+
+val jdbcRDD = new JdbcRDD[T](
+  sc.sc,
+  () => connectionFactory.getConnection,
+  sql,
+  lowerBound,
+  upperBound,
+  numPartitions,
+  (resultSet: ResultSet) => mapRow.call(resultSet))(fakeClassTag)
+
+new JavaRDD[T](jdbcRDD)(fakeClassTag)
+  }
+
+  /**
+   * Create an RDD that executes an SQL query on a JDBC connec

spark git commit: [SPARK-4613][Core] Java API for JdbcRDD

2014-11-27 Thread matei
Repository: spark
Updated Branches:
  refs/heads/master 84376d313 -> 120a35024


[SPARK-4613][Core] Java API for JdbcRDD

This PR introduces a set of Java APIs for using `JdbcRDD`:

1. Trait (interface) `JdbcRDD.ConnectionFactory`: equivalent to the 
`getConnection: () => Connection` parameter in `JdbcRDD` constructor.
2. Two overloaded versions of `Jdbc.create`: used to create `JavaRDD` that 
wraps a `JdbcRDD`.


[https://reviewable.io/review_button.png"; height=40 alt="Review on 
Reviewable"/>](https://reviewable.io/reviews/apache/spark/3478)


Author: Cheng Lian 

Closes #3478 from liancheng/japi-jdbc-rdd and squashes the following commits:

9a54625 [Cheng Lian] Only shutdowns a single DB rather than the whole Derby 
driver
d4cedc5 [Cheng Lian] Moves Java JdbcRDD test case to a separate test suite
ffcdf2e [Cheng Lian] Java API for JdbcRDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/120a3502
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/120a3502
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/120a3502

Branch: refs/heads/master
Commit: 120a350240f58196eafcb038ca3a353636d89239
Parents: 84376d3
Author: Cheng Lian 
Authored: Thu Nov 27 18:01:14 2014 -0800
Committer: Matei Zaharia 
Committed: Thu Nov 27 18:01:14 2014 -0800

--
 .../scala/org/apache/spark/rdd/JdbcRDD.scala|  84 -
 .../java/org/apache/spark/JavaJdbcRDDSuite.java | 118 +++
 .../org/apache/spark/rdd/JdbcRDDSuite.scala |   7 +-
 3 files changed, 204 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/120a3502/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index 0e38f22..642a12c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -21,8 +21,11 @@ import java.sql.{Connection, ResultSet}
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.util.NextIterator
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 
 private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) 
extends Partition {
   override def index = idx
@@ -125,5 +128,82 @@ object JdbcRDD {
   def resultSetToObjectArray(rs: ResultSet): Array[Object] = {
 Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i 
+ 1))
   }
-}
 
+  trait ConnectionFactory extends Serializable {
+@throws[Exception]
+def getConnection: Connection
+  }
+
+  /**
+   * Create an RDD that executes an SQL query on a JDBC connection and reads 
results.
+   * For usage example, see test case JavaAPISuite.testJavaJdbcRDD.
+   *
+   * @param connectionFactory a factory that returns an open Connection.
+   *   The RDD takes care of closing the connection.
+   * @param sql the text of the query.
+   *   The query must contain two ? placeholders for parameters used to 
partition the results.
+   *   E.g. "select title, author from books where ? <= id and id <= ?"
+   * @param lowerBound the minimum value of the first placeholder
+   * @param upperBound the maximum value of the second placeholder
+   *   The lower and upper bounds are inclusive.
+   * @param numPartitions the number of partitions.
+   *   Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
+   *   the query would be executed twice, once with (1, 10) and once with (11, 
20)
+   * @param mapRow a function from a ResultSet to a single row of the desired 
result type(s).
+   *   This should only call getInt, getString, etc; the RDD takes care of 
calling next.
+   *   The default maps a ResultSet to an array of Object.
+   */
+  def create[T](
+  sc: JavaSparkContext,
+  connectionFactory: ConnectionFactory,
+  sql: String,
+  lowerBound: Long,
+  upperBound: Long,
+  numPartitions: Int,
+  mapRow: JFunction[ResultSet, T]): JavaRDD[T] = {
+
+val jdbcRDD = new JdbcRDD[T](
+  sc.sc,
+  () => connectionFactory.getConnection,
+  sql,
+  lowerBound,
+  upperBound,
+  numPartitions,
+  (resultSet: ResultSet) => mapRow.call(resultSet))(fakeClassTag)
+
+new JavaRDD[T](jdbcRDD)(fakeClassTag)
+  }
+
+  /**
+   * Create an RDD that executes an SQL query on a JDBC connection and reads 
results. Each row is
+   * converted into a `Object` array. For usage example, see test case