[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15868#discussion_r87915156 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -667,7 +667,14 @@ object JdbcUtils extends Logging { val getConnection: () => Connection = createConnectionFactory(options) val batchSize = options.batchSize val isolationLevel = options.isolationLevel -df.foreachPartition(iterator => savePartition( +val numPartitions = options.numPartitions +val repartitionedDF = + if (numPartitions != null && numPartitions.toInt != df.rdd.getNumPartitions) { --- End diff -- Increasing the number of partitions can improve the insert performance in some scenarios, I think. However, `repartition` is not cheap. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15868#discussion_r87913599 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -667,7 +667,14 @@ object JdbcUtils extends Logging { val getConnection: () => Connection = createConnectionFactory(options) val batchSize = options.batchSize val isolationLevel = options.isolationLevel -df.foreachPartition(iterator => savePartition( +val numPartitions = options.numPartitions +val repartitionedDF = + if (numPartitions != null && numPartitions.toInt != df.rdd.getNumPartitions) { +df.repartition(numPartitions.toInt) --- End diff -- Is that ok to use `coalesce` here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15868#discussion_r87910285 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -667,7 +667,14 @@ object JdbcUtils extends Logging { val getConnection: () => Connection = createConnectionFactory(options) val batchSize = options.batchSize val isolationLevel = options.isolationLevel -df.foreachPartition(iterator => savePartition( +val numPartitions = options.numPartitions +val repartitionedDF = + if (numPartitions != null && numPartitions.toInt != df.rdd.getNumPartitions) { --- End diff -- Normally, based on my understanding, users only cares the maximal number of connections. Thus, no need to repartition it when `numPartitions.toInt >= df.rdd.getNumPartitions`, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/15868#discussion_r87909790 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -70,6 +70,9 @@ class JDBCOptions( } } + // the number of partitions --- End diff -- This is not clear. The document needs an update. http://spark.apache.org/docs/latest/sql-programming-guide.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/15868#discussion_r87875679 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -667,7 +667,14 @@ object JdbcUtils extends Logging { val getConnection: () => Connection = createConnectionFactory(options) val batchSize = options.batchSize val isolationLevel = options.isolationLevel -df.foreachPartition(iterator => savePartition( +val numPartitions = options.numPartitions +val repartitionedDF = if (numPartitions != null && +numPartitions.toInt != df.rdd.getNumPartitions) { --- End diff -- Tiny style point -- breaking the if statement that way looks a little funny to my eyes. I might do ... ``` val repartitionedDF = if (...) { ... ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/15868#discussion_r87872317 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -667,7 +667,14 @@ object JdbcUtils extends Logging { val getConnection: () => Connection = createConnectionFactory(options) val batchSize = options.batchSize val isolationLevel = options.isolationLevel -df.foreachPartition(iterator => savePartition( +val numPartitions = options.numPartitions +val repartitionedDF = if (numPartitions != null && +numPartitions.toInt != df.rdd.getNumPartitions) { + df.repartition(numPartitions.toInt) +} else { + df +} +repartitionedDF.foreachPartition(iterator => savePartition( --- End diff -- Now, `foreachPartition` is outside of `if..else..`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/15868#discussion_r87857574 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -667,9 +667,15 @@ object JdbcUtils extends Logging { val getConnection: () => Connection = createConnectionFactory(options) val batchSize = options.batchSize val isolationLevel = options.isolationLevel -df.foreachPartition(iterator => savePartition( - getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel) -) +if (options.numPartitions != null && options.numPartitions.toInt != df.rdd.getNumPartitions) { + df.repartition(options.numPartitions.toInt).foreachPartition(iterator => savePartition( --- End diff -- Thank you for review, @srowen . First, the property `numPartitions` already exists in [JDBCOptions.scala: Optional parameters only for reading](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala#L73-L83). This PR makes that option meaningful during write operation. Second, for dataframe usecases, we can call `repartition` before saving to manage this. Actually, I asked @lichenglin that way. But, the main purpose of issue requested by @lichenglin is about providing pure SQL way to control the number of partitions for writing. In SQL, this looks reasonable to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/15868#discussion_r87785862 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -667,9 +667,15 @@ object JdbcUtils extends Logging { val getConnection: () => Connection = createConnectionFactory(options) val batchSize = options.batchSize val isolationLevel = options.isolationLevel -df.foreachPartition(iterator => savePartition( - getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel) -) +if (options.numPartitions != null && options.numPartitions.toInt != df.rdd.getNumPartitions) { + df.repartition(options.numPartitions.toInt).foreachPartition(iterator => savePartition( --- End diff -- This repeats the `foreachPartition` part twice, when that could be outside the if-else. I don't know enough to say whether we should add this property. It seems a little funny to set this globally to apply to all dataframes written to a database. I understand the use case is pure SQL, where perhaps that's the only option. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...
GitHub user dongjoon-hyun opened a pull request: https://github.com/apache/spark/pull/15868 [SPARK-18413][SQL] Control the number of JDBC connections by repartition with `numPartition` JDBCOption ## What changes were proposed in this pull request? This PR aims to control the number of JDBC connections by repartition with `numPartition` (`JDBC_NUM_PARTITIONS` JDBCOption). Currently, `JDBC_NUM_PARTITIONS` is documented as an optional parameter for only reading. **Reported Scenario** For the following cases, the number of connections becomes 200 and database cannot handle all of them. ```sql CREATE OR REPLACE TEMPORARY VIEW resultview USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:oracle:thin:@10.129.10.111:1521:BKDB", dbtable "result", user "HIVE", password "HIVE" ); -- set spark.sql.shuffle.partitions=200 INSERT OVERWRITE TABLE resultview SELECT g, count(1) AS COUNT FROM tnet.DT_LIVE_INFO GROUP BY g ``` ## How was this patch tested? Manual by using `Client Connection Tab` of `MySQLWorkbench`. After creating table t1 by `CREATE TABLE t1 (a INT)` in MySQL, do the following. ```scala SPARK_HOME=$PWD bin/spark-shell --driver-memory 4G --driver-class-path mysql-connector-java-5.1.40-bin.jar scala> sql("CREATE OR REPLACE TEMPORARY VIEW v1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', numPartitions '1')") scala> sql("INSERT OVERWRITE TABLE v1 SELECT 1").show scala> sql("CREATE OR REPLACE TEMPORARY VIEW v1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', numPartitions '20')") scala> sql("INSERT OVERWRITE TABLE v1 SELECT 1").show ``` The total connection is increased by 3 and 20, respectively for the first and second insertion. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dongjoon-hyun/spark SPARK-18413 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15868.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15868 commit e29974a08da14b63aec1f6fc9798bac532e4d97f Author: Dongjoon Hyun Date: 2016-11-13T04:07:39Z [SPARK-18413][SQL] Control the number of JDBC connections by repartition with `numPartition` JDBCOption --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org