Repository: spark Updated Branches: refs/heads/master dcfe0c5cd -> bb220f657
[SPARK-10040] [SQL] Use batch insert for JDBC writing JIRA: https://issues.apache.org/jira/browse/SPARK-10040 We should use batch insert instead of single row in JDBC. Author: Liang-Chi Hsieh <vii...@appier.com> Closes #8273 from viirya/jdbc-insert-batch. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb220f65 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb220f65 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb220f65 Branch: refs/heads/master Commit: bb220f6570aa0b95598b30524224a3e82c1effbc Parents: dcfe0c5 Author: Liang-Chi Hsieh <vii...@appier.com> Authored: Fri Aug 21 01:43:49 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Fri Aug 21 01:43:49 2015 -0700 ---------------------------------------------------------------------- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bb220f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 2d0e736..26788b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -88,13 +88,15 @@ object JdbcUtils extends Logging { table: String, iterator: Iterator[Row], rddSchema: StructType, - nullTypes: Array[Int]): Iterator[Byte] = { + nullTypes: Array[Int], + batchSize: Int): Iterator[Byte] = { val conn = getConnection() var committed = false try { conn.setAutoCommit(false) // Everything in the same db transaction. val stmt = insertStatement(conn, table, rddSchema) try { + var rowCount = 0 while (iterator.hasNext) { val row = iterator.next() val numFields = rddSchema.fields.length @@ -122,7 +124,15 @@ object JdbcUtils extends Logging { } i = i + 1 } - stmt.executeUpdate() + stmt.addBatch() + rowCount += 1 + if (rowCount % batchSize == 0) { + stmt.executeBatch() + rowCount = 0 + } + } + if (rowCount > 0) { + stmt.executeBatch() } } finally { stmt.close() @@ -211,8 +221,9 @@ object JdbcUtils extends Logging { val rddSchema = df.schema val driver: String = DriverRegistry.getDriverClassName(url) val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties) + val batchSize = properties.getProperty("batchsize", "1000").toInt df.foreachPartition { iterator => - savePartition(getConnection, table, iterator, rddSchema, nullTypes) + savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org