[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...

2016-11-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15868#discussion_r87915156
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -667,7 +667,14 @@ object JdbcUtils extends Logging {
 val getConnection: () => Connection = createConnectionFactory(options)
 val batchSize = options.batchSize
 val isolationLevel = options.isolationLevel
-df.foreachPartition(iterator => savePartition(
+val numPartitions = options.numPartitions
+val repartitionedDF =
+  if (numPartitions != null && numPartitions.toInt != 
df.rdd.getNumPartitions) {
--- End diff --

Increasing the number of partitions can improve the insert performance in 
some scenarios, I think. However, `repartition` is not cheap.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...

2016-11-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15868#discussion_r87913599
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -667,7 +667,14 @@ object JdbcUtils extends Logging {
 val getConnection: () => Connection = createConnectionFactory(options)
 val batchSize = options.batchSize
 val isolationLevel = options.isolationLevel
-df.foreachPartition(iterator => savePartition(
+val numPartitions = options.numPartitions
+val repartitionedDF =
+  if (numPartitions != null && numPartitions.toInt != 
df.rdd.getNumPartitions) {
+df.repartition(numPartitions.toInt)
--- End diff --

Is that ok to use `coalesce` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...

2016-11-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15868#discussion_r87910285
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -667,7 +667,14 @@ object JdbcUtils extends Logging {
 val getConnection: () => Connection = createConnectionFactory(options)
 val batchSize = options.batchSize
 val isolationLevel = options.isolationLevel
-df.foreachPartition(iterator => savePartition(
+val numPartitions = options.numPartitions
+val repartitionedDF =
+  if (numPartitions != null && numPartitions.toInt != 
df.rdd.getNumPartitions) {
--- End diff --

Normally, based on my understanding, users only cares the maximal number of 
connections. Thus, no need to repartition it when `numPartitions.toInt >= 
df.rdd.getNumPartitions`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...

2016-11-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15868#discussion_r87909790
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 ---
@@ -70,6 +70,9 @@ class JDBCOptions(
 }
   }
 
+  // the number of partitions
--- End diff --

This is not clear. The document needs an update. 

http://spark.apache.org/docs/latest/sql-programming-guide.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...

2016-11-14 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15868#discussion_r87875679
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -667,7 +667,14 @@ object JdbcUtils extends Logging {
 val getConnection: () => Connection = createConnectionFactory(options)
 val batchSize = options.batchSize
 val isolationLevel = options.isolationLevel
-df.foreachPartition(iterator => savePartition(
+val numPartitions = options.numPartitions
+val repartitionedDF = if (numPartitions != null &&
+numPartitions.toInt != df.rdd.getNumPartitions) {
--- End diff --

Tiny style point -- breaking the if statement that way looks a little funny 
to my eyes. I might do ...

```
val repartitionedDF =
  if (...) {
...
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...

2016-11-14 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15868#discussion_r87872317
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -667,7 +667,14 @@ object JdbcUtils extends Logging {
 val getConnection: () => Connection = createConnectionFactory(options)
 val batchSize = options.batchSize
 val isolationLevel = options.isolationLevel
-df.foreachPartition(iterator => savePartition(
+val numPartitions = options.numPartitions
+val repartitionedDF = if (numPartitions != null &&
+numPartitions.toInt != df.rdd.getNumPartitions) {
+  df.repartition(numPartitions.toInt)
+} else {
+  df
+}
+repartitionedDF.foreachPartition(iterator => savePartition(
--- End diff --

Now, `foreachPartition` is outside of `if..else..`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...

2016-11-14 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15868#discussion_r87857574
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -667,9 +667,15 @@ object JdbcUtils extends Logging {
 val getConnection: () => Connection = createConnectionFactory(options)
 val batchSize = options.batchSize
 val isolationLevel = options.isolationLevel
-df.foreachPartition(iterator => savePartition(
-  getConnection, table, iterator, rddSchema, nullTypes, batchSize, 
dialect, isolationLevel)
-)
+if (options.numPartitions != null && options.numPartitions.toInt != 
df.rdd.getNumPartitions) {
+  
df.repartition(options.numPartitions.toInt).foreachPartition(iterator => 
savePartition(
--- End diff --

Thank you for review, @srowen .

First, the property `numPartitions` already exists in [JDBCOptions.scala: 
Optional parameters only for 
reading](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala#L73-L83).
This PR makes that option meaningful during write operation.

Second, for dataframe usecases, we can call `repartition` before saving to 
manage this. Actually, I asked @lichenglin that way. But, the main purpose of 
issue requested by @lichenglin is about providing pure SQL way to control the 
number of partitions for writing. In SQL, this looks reasonable to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...

2016-11-14 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15868#discussion_r87785862
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -667,9 +667,15 @@ object JdbcUtils extends Logging {
 val getConnection: () => Connection = createConnectionFactory(options)
 val batchSize = options.batchSize
 val isolationLevel = options.isolationLevel
-df.foreachPartition(iterator => savePartition(
-  getConnection, table, iterator, rddSchema, nullTypes, batchSize, 
dialect, isolationLevel)
-)
+if (options.numPartitions != null && options.numPartitions.toInt != 
df.rdd.getNumPartitions) {
+  
df.repartition(options.numPartitions.toInt).foreachPartition(iterator => 
savePartition(
--- End diff --

This repeats the `foreachPartition` part twice, when that could be outside 
the if-else.

I don't know enough to say whether we should add this property. It seems a 
little funny to set this globally to apply to all dataframes written to a 
database. I understand the use case is pure SQL, where perhaps that's the only 
option.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15868: [SPARK-18413][SQL] Control the number of JDBC con...

2016-11-12 Thread dongjoon-hyun
GitHub user dongjoon-hyun opened a pull request:

https://github.com/apache/spark/pull/15868

[SPARK-18413][SQL] Control the number of JDBC connections by repartition 
with `numPartition` JDBCOption

## What changes were proposed in this pull request?

This PR aims to control the number of JDBC connections by repartition with 
`numPartition` (`JDBC_NUM_PARTITIONS` JDBCOption). Currently, 
`JDBC_NUM_PARTITIONS` is documented as an optional parameter for only reading.

**Reported Scenario**

For the following cases, the number of connections becomes 200 and database 
cannot handle all of them.

```sql
CREATE OR REPLACE TEMPORARY VIEW resultview
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:oracle:thin:@10.129.10.111:1521:BKDB",
  dbtable "result",
  user "HIVE",
  password "HIVE"
);
-- set spark.sql.shuffle.partitions=200
INSERT OVERWRITE TABLE resultview SELECT g, count(1) AS COUNT FROM 
tnet.DT_LIVE_INFO GROUP BY g
```

## How was this patch tested?

Manual by using `Client Connection Tab` of `MySQLWorkbench`. After creating 
table t1 by `CREATE TABLE t1 (a INT)` in MySQL, do the following.

```scala
SPARK_HOME=$PWD bin/spark-shell --driver-memory 4G --driver-class-path 
mysql-connector-java-5.1.40-bin.jar
scala> sql("CREATE OR REPLACE TEMPORARY VIEW v1 USING 
org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 
't1', user 'root', password '', numPartitions '1')")
scala> sql("INSERT OVERWRITE TABLE v1 SELECT 1").show
scala> sql("CREATE OR REPLACE TEMPORARY VIEW v1 USING 
org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 
't1', user 'root', password '', numPartitions '20')")
scala> sql("INSERT OVERWRITE TABLE v1 SELECT 1").show
```

The total connection is increased by 3 and 20, respectively for the first 
and second insertion.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dongjoon-hyun/spark SPARK-18413

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15868.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15868


commit e29974a08da14b63aec1f6fc9798bac532e4d97f
Author: Dongjoon Hyun 
Date:   2016-11-13T04:07:39Z

[SPARK-18413][SQL] Control the number of JDBC connections by repartition 
with `numPartition` JDBCOption




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org