[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80641407 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,102 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import scala.collection.JavaConverters.mapAsJavaMapConverter -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} + +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { +val partitionInfo = if (partitionColumn == null) { null } else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: --- End diff -- I also took a look at @gatorsmile 's approach, I think it's easier to understand, why it's rejected? We can also get rid of the `return`: ``` if (tableExists) { mode match { case SaveMode.Ignore => .. } } else { .. } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80628570 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,102 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import scala.collection.JavaConverters.mapAsJavaMapConverter -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} + +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { +val partitionInfo = if (partitionColumn == null) { null } else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: --- End diff -- If the table does not exist and the mode is `OVERWRITE`, we create a table, then insert rows into the table, and finally return a BaseRelation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80628287 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,102 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import scala.collection.JavaConverters.mapAsJavaMapConverter -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} + +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { +val partitionInfo = if (partitionColumn == null) { null } else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: --- End diff -- Now, at least, three of reviewers are confused of this bit. Do you mind if I submit a PR to clean up this part? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80627940 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,102 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import scala.collection.JavaConverters.mapAsJavaMapConverter -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} + +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { +val partitionInfo = if (partitionColumn == null) { null } else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: --- End diff -- what does this table mean? what is `CreateTable, saveTable, BaseRelation`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/12601 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80404639 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -208,4 +210,84 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } + + test("save works for format(\"jdbc\") if url and dbtable are set") { +val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +df.write.format("jdbc") +.options(Map("url" -> url, "dbtable" -> "TEST.SAVETEST")) +.save --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80404577 --- Diff: docs/sql-programming-guide.md --- @@ -1096,13 +1096,17 @@ the Data Sources API. The following options are supported: {% highlight sql %} -CREATE TEMPORARY VIEW jdbcTable +CREATE TEMPORARY TABLE jdbcTable --- End diff -- Done, thanks. I had been going off of the tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80353253 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -420,62 +420,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { assertNotPartitioned("jdbc") assertNotBucketed("jdbc") - -// to add required options like URL and dbtable -val params = extraOptions.toMap ++ Map("url" -> url, "dbtable" -> table) -val jdbcOptions = new JDBCOptions(params) -val jdbcUrl = jdbcOptions.url -val jdbcTable = jdbcOptions.table - -val props = new Properties() -extraOptions.foreach { case (key, value) => - props.put(key, value) -} // connectionProperties should override settings in extraOptions -props.putAll(connectionProperties) -val conn = JdbcUtils.createConnectionFactory(jdbcUrl, props)() - -try { - var tableExists = JdbcUtils.tableExists(conn, jdbcUrl, jdbcTable) - - if (mode == SaveMode.Ignore && tableExists) { -return - } - - if (mode == SaveMode.ErrorIfExists && tableExists) { -sys.error(s"Table $jdbcTable already exists.") - } - - if (mode == SaveMode.Overwrite && tableExists) { -if (jdbcOptions.isTruncate && -JdbcUtils.isCascadingTruncateTable(jdbcUrl) == Some(false)) { - JdbcUtils.truncateTable(conn, jdbcTable) -} else { - JdbcUtils.dropTable(conn, jdbcTable) - tableExists = false -} - } - - // Create the table if the table didn't exist. - if (!tableExists) { -val schema = JdbcUtils.schemaString(df, jdbcUrl) -// To allow certain options to append when create a new table, which can be -// table_options or partition_options. -// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8" -val createtblOptions = jdbcOptions.createTableOptions -val sql = s"CREATE TABLE $jdbcTable ($schema) $createtblOptions" -val statement = conn.createStatement -try { - statement.executeUpdate(sql) -} finally { - statement.close() -} - } -} finally { - conn.close() -} - -JdbcUtils.saveTable(df, jdbcUrl, jdbcTable, props) +this.extraOptions = this.extraOptions ++ (connectionProperties.asScala) +// explicit url and dbtable should override all +this.extraOptions += ("url" -> url, "dbtable" -> table) +format("jdbc").save --- End diff -- The omission of parentheses on methods should only be used when the method has no side-effects. Thus, please change it to `save()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80353203 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -208,4 +210,84 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } + + test("save works for format(\"jdbc\") if url and dbtable are set") { +val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +df.write.format("jdbc") +.options(Map("url" -> url, "dbtable" -> "TEST.SAVETEST")) +.save --- End diff -- Nit: `save` -> `save()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80353010 --- Diff: docs/sql-programming-guide.md --- @@ -1096,13 +1096,17 @@ the Data Sources API. The following options are supported: {% highlight sql %} -CREATE TEMPORARY VIEW jdbcTable +CREATE TEMPORARY TABLE jdbcTable --- End diff -- Please change it back. `CREATE TEMPORARY TABLE` is deprecated. You will get a Parser error ``` CREATE TEMPORARY TABLE is not supported yet. Please use CREATE TEMPORARY VIEW as an alternative.(line 1, pos 0) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80352586 --- Diff: examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java --- @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Properties; // $example off:schema_merging$ --- End diff -- @HyukjinKwon Yes, that is what I was talking about...just fixed it back --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80352317 --- Diff: examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java --- @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Properties; // $example off:schema_merging$ --- End diff -- Oh, maybe, my previous comment was not clear. I meant ```java Import java.util.List; // $example off:schema_merging$ Import java.util.Properties; ``` I haven't tried to build the doc against the current state but I guess we won't need this import for Parquet`s schema mering example. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80350919 --- Diff: examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java --- @@ -23,6 +23,8 @@ import java.util.List; // $example off:schema_merging$ +import java.util.Properties; + --- End diff -- No reason to not follow the guildline? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80350755 --- Diff: examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java --- @@ -23,6 +23,8 @@ import java.util.List; // $example off:schema_merging$ +import java.util.Properties; + --- End diff -- Should this really be added to the example, though? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r80350458 --- Diff: examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java --- @@ -23,6 +23,8 @@ import java.util.List; // $example off:schema_merging$ +import java.util.Properties; + --- End diff -- I think we should put `java.util` imports together without additional newline. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77950064 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -44,6 +46,11 @@ class JDBCOptions( // the number of partitions val numPartitions = parameters.getOrElse("numPartitions", null) + require(partitionColumn == null || +(partitionColumn != null && lowerBound != null && upperBound != null && numPartitions != null), --- End diff -- You can simplify it by ```Scala partitionColumn == null || (lowerBound != null && upperBound != null && numPartitions != null) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77949636 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) { null } +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +import collection.JavaConverters._ + +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { +case (SaveMode.Ignore, true) => (false, false) +case (SaveMode.ErrorIfExists, true) => throw new SQLException( --- End diff -- Yeah, then, just issue an `AnalysisException`, like what we did in the other places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77949563 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -82,7 +81,7 @@ class JdbcRelationProvider extends CreatableRelationProvider val (doCreate, doSave) = (mode, tableExists) match { case (SaveMode.Ignore, true) => (false, false) -case (SaveMode.ErrorIfExists, true) => throw new TableAlreadyExistsException( +case (SaveMode.ErrorIfExists, true) => throw new SQLException( s"Table $table already exists, and SaveMode is set to ErrorIfExists.") --- End diff -- Normally, we issue an `AnalysisException`. `Table or view $table already exists` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77949242 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") --- End diff -- Added. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77949211 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,106 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException --- End diff -- It depends on if the table exception is used or not. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77949176 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,106 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import scala.collection.JavaConverters.mapAsJavaMapConverter -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType --- End diff -- Correct, this was left from SchemaRelationProvider --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77949090 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,106 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import scala.collection.JavaConverters.mapAsJavaMapConverter -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType + +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null +val partitionInfo = if (partitionColumn == null) { + null } else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { +case (SaveMode.Ignore, true) => (false, false) +case (SaveMode.ErrorIfExists, true) => throw new TableAlreadyExistsException( + s"Table $table already exists, and SaveMode is set to ErrorIfExists.") --- End diff -- No, [it is required...](https://github.com/apache/spark/blob/5c6b0855787c080d3e233eb09c05c025395e7cb3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala#L30) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77948807 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,106 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import scala.collection.JavaConverters.mapAsJavaMapConverter -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType + +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null +val partitionInfo = if (partitionColumn == null) { + null } else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { +case (SaveMode.Ignore, true) => (false, false) +case (SaveMode.ErrorIfExists, true) => throw new TableAlreadyExistsException( + s"Table $table already exists, and SaveMode is set to ErrorIfExists.") --- End diff -- Just pass the table name. That is enough --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77948661 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,106 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import scala.collection.JavaConverters.mapAsJavaMapConverter -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType + +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null +val partitionInfo = if (partitionColumn == null) { + null } else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { +case (SaveMode.Ignore, true) => (false, false) +case (SaveMode.ErrorIfExists, true) => throw new TableAlreadyExistsException( --- End diff -- You need to import it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77948624 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,106 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import scala.collection.JavaConverters.mapAsJavaMapConverter -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType + +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null +val partitionInfo = if (partitionColumn == null) { + null --- End diff -- It sounds like you add an extra space. Could you run the command to check the style? `dev/lint-scala` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77948654 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,106 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import scala.collection.JavaConverters.mapAsJavaMapConverter -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType + +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null +val partitionInfo = if (partitionColumn == null) { + null --- End diff -- It seems mistakenly white spaces were added here. scalastyle will complain about this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77948487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,106 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException --- End diff -- Not used, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77948436 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,106 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import scala.collection.JavaConverters.mapAsJavaMapConverter -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType --- End diff -- Not used, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77948300 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType --- End diff -- Not used? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77948243 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -208,4 +208,75 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } + + test("save works for format(\"jdbc\") if url and dbtable are set") { +val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +df.write.format("jdbc") +.options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST")) +.save + +assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count) +assert( + 2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length) + } + + test("save API with SaveMode.Overwrite") { +import scala.collection.JavaConverters._ + +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) +val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) + +df.write.format("jdbc") + .option("url", url1) + .option("dbtable", "TEST.TRUNCATETEST") + .options(properties.asScala) + .save() +df2.write.mode(SaveMode.Overwrite).format("jdbc") + .option("url", url1) + .option("dbtable", "TEST.TRUNCATETEST") + .options(properties.asScala) + .save() +assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count()) +assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) + } + + test("save errors if url is not specified") { +import scala.collection.JavaConverters._ +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +var e = intercept[RuntimeException] { + df.write.format("jdbc") +.option("dbtable", "TEST.TRUNCATETEST") +.options(properties.asScala) +.save() +}.getMessage +assert(e.contains("Option 'url' is required")) + } + + test("save errors if dbtable is not specified") { +import scala.collection.JavaConverters._ +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +var e = intercept[RuntimeException] { + df.write.format("jdbc") +.option("url", url1) +.options(properties.asScala) +.save() +}.getMessage +assert(e.contains("Option 'dbtable' is required")) + } + + test("save errors if wrong user/password combination") { +import scala.collection.JavaConverters._ +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +var e = intercept[org.h2.jdbc.JdbcSQLException] { --- End diff -- I must have copy/pasted this from somewhere...no clue why I would use a var otherwise. Shame on me :p --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77948196 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) { null } --- End diff -- Yes, but the guidelines do not specify this scenario. It is not returning a unit, but a value and looks ridiculous in comparison. I have made the change to fit your needs and "_speed_" up, though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77948104 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) { null } +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +import collection.JavaConverters._ + +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { +case (SaveMode.Ignore, true) => (false, false) +case (SaveMode.ErrorIfExists, true) => throw new SQLException( --- End diff -- I didn't know that exception had been created in the Spark code. Any suggestions for an easy way to pull out the db, though? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77948082 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") --- End diff -- uh, we do not have a test case to cover that. Since you made a change, could you add such a test case? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77947902 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,113 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) -JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) +JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties, + Option(schema))(sqlContext.sparkSession) + } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +import collection.JavaConverters._ +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { --- End diff -- I did add a comment in the method signature. That and the variable naming conventions should cover this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77947885 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -208,4 +208,75 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } + + test("save works for format(\"jdbc\") if url and dbtable are set") { +val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +df.write.format("jdbc") +.options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST")) +.save + +assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count) +assert( + 2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length) + } + + test("save API with SaveMode.Overwrite") { +import scala.collection.JavaConverters._ + +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) +val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) + +df.write.format("jdbc") + .option("url", url1) + .option("dbtable", "TEST.TRUNCATETEST") --- End diff -- How about `TRUNCATETEST` -> `SAVETEST`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77947343 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,113 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) -JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) +JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties, + Option(schema))(sqlContext.sparkSession) + } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +import collection.JavaConverters._ +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { --- End diff -- Then would it make sense if we add some comments for each case? In a quick look, it seems really confusing what each case means to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946981 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) { null } +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +import collection.JavaConverters._ + +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { +case (SaveMode.Ignore, true) => (false, false) +case (SaveMode.ErrorIfExists, true) => throw new SQLException( + s"Table $table already exists, and SaveMode is set to ErrorIfExists.") +case (SaveMode.Overwrite, true) => + if (jdbcOptions.isTruncate && JdbcUtils.isCascadingTruncateTable(url) == Some(false)) { +JdbcUtils.truncateTable(conn, table) +(false, true) + } else { +JdbcUtils.dropTable(conn, table) +(true, true) + } +case (SaveMode.Append, true) => (false, true) +case (_, true) => throw new IllegalArgumentException(s"Unexpected SaveMode, '$mode'," + + " for handling existing tables.") +case (_, false) => (true, true) + }
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946944 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) { null } +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +import collection.JavaConverters._ + +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { +case (SaveMode.Ignore, true) => (false, false) +case (SaveMode.ErrorIfExists, true) => throw new SQLException( + s"Table $table already exists, and SaveMode is set to ErrorIfExists.") +case (SaveMode.Overwrite, true) => + if (jdbcOptions.isTruncate && JdbcUtils.isCascadingTruncateTable(url) == Some(false)) { +JdbcUtils.truncateTable(conn, table) +(false, true) + } else { +JdbcUtils.dropTable(conn, table) +(true, true) + } +case (SaveMode.Append, true) => (false, true) +case (_, true) => throw new IllegalArgumentException(s"Unexpected SaveMode, '$mode'," + + " for handling existing tables.") +case (_, false) => (true, true) + }
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946854 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) { null } --- End diff -- Let's follow this just to speed up and follow the majority of the other codes. There is a correct example in the guide lines as blow: ```scala // Correct: if (true) { println("Wow!") } ``` not ```scala // Correct: if (true) { println("Wow!") } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946835 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -208,4 +208,75 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } + + test("save works for format(\"jdbc\") if url and dbtable are set") { +val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +df.write.format("jdbc") +.options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST")) +.save + +assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count) +assert( + 2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length) + } + + test("save API with SaveMode.Overwrite") { +import scala.collection.JavaConverters._ + +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) +val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) + +df.write.format("jdbc") + .option("url", url1) + .option("dbtable", "TEST.TRUNCATETEST") --- End diff -- To what? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946806 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) { null } +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +import collection.JavaConverters._ + +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { +case (SaveMode.Ignore, true) => (false, false) +case (SaveMode.ErrorIfExists, true) => throw new SQLException( --- End diff -- Please throw the exception `TableAlreadyExistsException`. The target could be a `View`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946715 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,113 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) -JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) +JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties, + Option(schema))(sqlContext.sparkSession) + } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +import collection.JavaConverters._ +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { --- End diff -- Ok. I am fine, if the other are ok about it. Let me review your version. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946660 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -208,4 +208,75 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } + + test("save works for format(\"jdbc\") if url and dbtable are set") { +val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +df.write.format("jdbc") +.options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST")) +.save + +assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count) +assert( + 2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length) + } + + test("save API with SaveMode.Overwrite") { +import scala.collection.JavaConverters._ + +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) +val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) + +df.write.format("jdbc") + .option("url", url1) + .option("dbtable", "TEST.TRUNCATETEST") + .options(properties.asScala) + .save() +df2.write.mode(SaveMode.Overwrite).format("jdbc") + .option("url", url1) + .option("dbtable", "TEST.TRUNCATETEST") + .options(properties.asScala) + .save() +assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count()) +assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) + } + + test("save errors if url is not specified") { +import scala.collection.JavaConverters._ +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +var e = intercept[RuntimeException] { + df.write.format("jdbc") +.option("dbtable", "TEST.TRUNCATETEST") +.options(properties.asScala) +.save() +}.getMessage +assert(e.contains("Option 'url' is required")) + } + + test("save errors if dbtable is not specified") { +import scala.collection.JavaConverters._ +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +var e = intercept[RuntimeException] { + df.write.format("jdbc") +.option("url", url1) +.options(properties.asScala) +.save() +}.getMessage +assert(e.contains("Option 'dbtable' is required")) + } + + test("save errors if wrong user/password combination") { +import scala.collection.JavaConverters._ --- End diff -- How about putting this import in the top with other imports if either is okay? I understand your point putting this inside but let's just follow the majority of other codes just to speed up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946637 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) { null } --- End diff -- I had put that when adding the brackets, but it actually hurts the code flow in this case. And there is nothing in the style guide saying otherwise. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946422 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) { null } +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +import collection.JavaConverters._ --- End diff -- I meant putting it up with other imports. Also, I remember this should be `scala.collection.JavaConverters`. scalastyle will check this IIRC. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946434 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") --- End diff -- It was moved into the JDBCOptions as had been previously discussed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946299 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,113 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) -JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) +JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties, + Option(schema))(sqlContext.sparkSession) + } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +import collection.JavaConverters._ +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { --- End diff -- Your way results in the need for a `return`, which can lead to problems and is [generally discouraged](https://tpolecat.github.io/2014/05/09/return.html). In the current implementation you could just have it do nothing and the next if block will be skipped anyway, but that leaves a lot of room for error in further code changes. Whereas this
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946277 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -208,4 +208,75 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } + + test("save works for format(\"jdbc\") if url and dbtable are set") { +val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +df.write.format("jdbc") +.options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST")) +.save + +assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count) +assert( + 2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length) + } + + test("save API with SaveMode.Overwrite") { +import scala.collection.JavaConverters._ + +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) +val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) + +df.write.format("jdbc") + .option("url", url1) + .option("dbtable", "TEST.TRUNCATETEST") + .options(properties.asScala) + .save() +df2.write.mode(SaveMode.Overwrite).format("jdbc") + .option("url", url1) + .option("dbtable", "TEST.TRUNCATETEST") + .options(properties.asScala) + .save() +assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count()) +assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) + } + + test("save errors if url is not specified") { +import scala.collection.JavaConverters._ +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +var e = intercept[RuntimeException] { + df.write.format("jdbc") +.option("dbtable", "TEST.TRUNCATETEST") +.options(properties.asScala) +.save() +}.getMessage +assert(e.contains("Option 'url' is required")) + } + + test("save errors if dbtable is not specified") { +import scala.collection.JavaConverters._ +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +var e = intercept[RuntimeException] { + df.write.format("jdbc") +.option("url", url1) +.options(properties.asScala) +.save() +}.getMessage +assert(e.contains("Option 'dbtable' is required")) + } + + test("save errors if wrong user/password combination") { +import scala.collection.JavaConverters._ +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +var e = intercept[org.h2.jdbc.JdbcSQLException] { --- End diff -- `var` -> `val`. The same issues in the above test cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946237 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -208,4 +208,75 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } + + test("save works for format(\"jdbc\") if url and dbtable are set") { +val df = sqlContext.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + +df.write.format("jdbc") +.options(Map("url" -> url, "dbtable" -> "TEST.BASICCREATETEST")) +.save + +assert(2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count) +assert( + 2 === sqlContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length) + } + + test("save API with SaveMode.Overwrite") { +import scala.collection.JavaConverters._ + +val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) +val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) + +df.write.format("jdbc") + .option("url", url1) + .option("dbtable", "TEST.TRUNCATETEST") --- End diff -- Could you update the table names in all the test cases? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946201 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") --- End diff -- Any reason why this is removed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77946123 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) { null } --- End diff -- ```Scala if (partitionColumn == null) { null } else { ... } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77945639 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala --- @@ -208,4 +208,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).count()) assert(2 === spark.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } + + test("save works for format(\"jdbc\") if url and dbtable are set") { --- End diff -- Negative test cases are needed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77945599 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,113 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) -JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) +JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties, + Option(schema))(sqlContext.sparkSession) + } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +import collection.JavaConverters._ +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { --- End diff -- I also prefer to my way, which looks cleaner and easier to understand. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77945537 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,104 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null --- End diff -- Please correct the style here. See https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77945298 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,104 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +import collection.JavaConverters._ --- End diff -- Can we maybe move this up too if either is okay? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77944399 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,113 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) -JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) +JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties, + Option(schema))(sqlContext.sparkSession) + } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + * + * *Overwrite & tableExists with truncate, will not drop & create, but instead truncate + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val jdbcOptions = new JDBCOptions(parameters) +val url = jdbcOptions.url +val table = jdbcOptions.table + +import collection.JavaConverters._ +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { --- End diff -- Initially, I mean to correct this as @gatorsmile did in [here](https://github.com/gatorsmile/spark/blob/07e316823ed17e89c3df0aaccf3fbb958afcfe3a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L390-L423). I am not saying this is wrong or inappropriate but just personally I'd prefer this way. --- If your
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77239532 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala --- @@ -114,7 +115,9 @@ private[sql] case class JDBCRelation( override val needConversion: Boolean = false - override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) + override val schema: StructType = { +providedSchemaOption.getOrElse(JDBCRDD.resolveTable(url, table, properties)) + } --- End diff -- @HyukjinKwon I thought that since this was over the 100 limit it would be more readable/maintainable in the long run to have brackets. Again, I have no preference and if you feel strongly I will make the change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77238783 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) -JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) +JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties, + Option(schema))(sqlContext.sparkSession) + } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite | DropTable, CreateTable, | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +require(parameters.isDefinedAt("url"), "Saving jdbc source requires 'url' to be set." + +" (ie. df.option(\"url\", \"ACTUAL_URL\")") +require(parameters.isDefinedAt("dbtable"), "Saving jdbc source requires 'dbtable' to be set." + +" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")") +val url = parameters("url") +val table = parameters("dbtable") --- End diff -- @HyukjinKwon I thought about that, but this code ends up not needing the extra checks. So, it seemed unnecessary at this point. I can go either way on this one, though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77122510 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -17,39 +17,105 @@ package org.apache.spark.sql.execution.datasources.jdbc +import java.sql.SQLException import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) -JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) +JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties, + Option(schema))(sqlContext.sparkSession) + } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite | DropTable, CreateTable, | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +require(parameters.isDefinedAt("url"), "Saving jdbc source requires 'url' to be set." + +" (ie. df.option(\"url\", \"ACTUAL_URL\")") +require(parameters.isDefinedAt("dbtable"), "Saving jdbc source requires 'dbtable' to be set." + +" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")") +val url = parameters("url") +val table = parameters("dbtable") --- End diff -- Would this make sense if we use `JDBCOptions` here too? (rather than adding `require` duplicately) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail:
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77122262 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala --- @@ -114,7 +115,9 @@ private[sql] case class JDBCRelation( override val needConversion: Boolean = false - override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) + override val schema: StructType = { +providedSchemaOption.getOrElse(JDBCRDD.resolveTable(url, table, properties)) + } --- End diff -- It seems we might not need the extra brackets here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77122073 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -396,49 +396,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { +import scala.collection.JavaConverters._ --- End diff -- +1 for moving the import up. Just it looks ugly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77067046 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -36,4 +38,9 @@ private[jdbc] class JDBCOptions( val upperBound = parameters.getOrElse("upperBound", null) // the number of partitions val numPartitions = parameters.getOrElse("numPartitions", null) + + require(partitionColumn == null || --- End diff -- Please double check my boolean conversion, as the prior if had to be flipped for the require. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77060202 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -36,4 +36,9 @@ private[jdbc] class JDBCOptions( val upperBound = parameters.getOrElse("upperBound", null) // the number of partitions val numPartitions = parameters.getOrElse("numPartitions", null) + + if (partitionColumn != null + && (lowerBound == null || upperBound == null || numPartitions == null)) { + sys.error("Partitioning incompletely specified") --- End diff -- Agreed, as already stated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77061367 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,100 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) -JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) +JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties, + Option(schema))(sqlContext.sparkSession) + } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite | DropTable, CreateTable, | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val url = parameters.getOrElse("url", + sys.error("Saving jdbc source requires url to be set." + +" (ie. df.option(\"url\", \"ACTUAL_URL\")")) +val table = parameters.getOrElse("dbtable", parameters.getOrElse("table", + sys.error("Saving jdbc source requires dbtable to be set." + +" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")"))) + +import collection.JavaConverters._ +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { +case (SaveMode.Ignore, true) => (false, false) +case (SaveMode.ErrorIfExists, true) => sys.error(s"Table $table already exists.") +case (SaveMode.Overwrite, true) => + JdbcUtils.dropTable(conn, table) ---
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77060138 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -36,4 +36,9 @@ private[jdbc] class JDBCOptions( val upperBound = parameters.getOrElse("upperBound", null) --- End diff -- I am fine with changing this to require as the short-circuit should occur at the very top instead of as the processing moves through. `sys.error` was how the old code was written, so I just had kept it as is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77060790 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,100 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( --- End diff -- No, this is to meet the requirements of trait `RelationProvider`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r77058751 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -396,49 +396,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { +import scala.collection.JavaConverters._ --- End diff -- I opted to only import them here because it is the only place they are required, so there is no need to drag in the import to the whole class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r76512092 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,100 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") -} +val partitionColumn = jdbcOptions.partitionColumn +val lowerBound = jdbcOptions.lowerBound +val upperBound = jdbcOptions.upperBound +val numPartitions = jdbcOptions.numPartitions -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) -JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) +JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties, + Option(schema))(sqlContext.sparkSession) + } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite | DropTable, CreateTable, | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val url = parameters.getOrElse("url", + sys.error("Saving jdbc source requires url to be set." + +" (ie. df.option(\"url\", \"ACTUAL_URL\")")) +val table = parameters.getOrElse("dbtable", parameters.getOrElse("table", + sys.error("Saving jdbc source requires dbtable to be set." + +" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")"))) + +import collection.JavaConverters._ +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists = JdbcUtils.tableExists(conn, url, table) + + val (doCreate, doSave) = (mode, tableExists) match { +case (SaveMode.Ignore, true) => (false, false) +case (SaveMode.ErrorIfExists, true) => sys.error(s"Table $table already exists.") +case (SaveMode.Overwrite, true) => + JdbcUtils.dropTable(conn, table) --- End
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r76512084 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -36,4 +36,9 @@ private[jdbc] class JDBCOptions( val upperBound = parameters.getOrElse("upperBound", null) --- End diff -- I see some of the existing code has a `getOrElse(..., sys.error...)`. I think we should switch to either ``` require(foo.isDefined, ...) ... foo.get(...) ``` or ``` foo.getOrElse(..., throw new IllegalArgumentException(...)) ``` One problem is that `sys.error` just generates a bald `RuntimeException`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r76512050 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,100 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( --- End diff -- Is this a separate method instead of using an optional arg to try to retain binary compatibility? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r76512038 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -396,49 +396,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { +import scala.collection.JavaConverters._ --- End diff -- Nit: I'd import these with other imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r76512043 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala --- @@ -36,4 +36,9 @@ private[jdbc] class JDBCOptions( val upperBound = parameters.getOrElse("upperBound", null) // the number of partitions val numPartitions = parameters.getOrElse("numPartitions", null) + + if (partitionColumn != null + && (lowerBound == null || upperBound == null || numPartitions == null)) { + sys.error("Partitioning incompletely specified") --- End diff -- We should be using `require` not `sys.error` everywhere --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r69267676 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,105 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { -val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { +val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) +val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) +val partitionColumn = parameters.getOrElse("partitionColumn", null) +val lowerBound = parameters.getOrElse("lowerBound", null) +val upperBound = parameters.getOrElse("upperBound", null) +val numPartitions = parameters.getOrElse("numPartitions", null) --- End diff -- I think the validation can be done together in `JDBCOptions`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r69267347 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala --- @@ -96,7 +97,16 @@ private[sql] case class JDBCRelation( override val needConversion: Boolean = false - override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) + override val schema: StructType = { +val resolvedSchema = JDBCRDD.resolveTable(url, table, properties) +providedSchemaOption match { + case Some(providedSchema) => +if (providedSchema.sql.toLowerCase == resolvedSchema.sql.toLowerCase) resolvedSchema --- End diff -- I think `JDBCRDD.resolveTable` needs another query execution. Although it would be less expensive than inferring schemas in CSV or JSON, it would be still a bit of overhead. I am not 100% about this too. So, I think it might be better to be consistent with the others in this case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r67582844 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,105 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { -val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { +val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) +val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) +val partitionColumn = parameters.getOrElse("partitionColumn", null) +val lowerBound = parameters.getOrElse("lowerBound", null) +val upperBound = parameters.getOrElse("upperBound", null) +val numPartitions = parameters.getOrElse("numPartitions", null) + +if (partitionColumn != null + && (lowerBound == null || upperBound == null || numPartitions == null)) { sys.error("Partitioning incompletely specified") } -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) -JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) +JDBCRelation(url, table, parts, properties, Option(schema))(sqlContext.sparkSession) + } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite | DropTable, CreateTable, | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val url = parameters.getOrElse("url", + sys.error("Saving jdbc source requires url to be set." + +" (ie. df.option(\"url\", \"ACTUAL_URL\")")) +val table = parameters.getOrElse("dbtable", parameters.getOrElse("table", + sys.error("Saving jdbc source requires dbtable to be set." + +" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")"))) + +import collection.JavaConverters._ +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists =
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r67582037 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala --- @@ -96,7 +97,16 @@ private[sql] case class JDBCRelation( override val needConversion: Boolean = false - override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) + override val schema: StructType = { +val resolvedSchema = JDBCRDD.resolveTable(url, table, properties) +providedSchemaOption match { + case Some(providedSchema) => +if (providedSchema.sql.toLowerCase == resolvedSchema.sql.toLowerCase) resolvedSchema --- End diff -- I can easily do a simpler getOrElse as is done in [spark-xml](https://github.com/databricks/spark-xml/blob/9f681939d16508abf4a12a129469ffebf87a2fa4/src/main/scala/com/databricks/spark/xml/XmlRelation.scala) which has more of a benefit of being lazier. But if an error does occur due to a mismatch, then the error is further from the original issue. I'm fine with either scenario, but at least wanted to give the other side for this one. Thoughts? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r67539731 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,105 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { -val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { +val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) +val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) +val partitionColumn = parameters.getOrElse("partitionColumn", null) +val lowerBound = parameters.getOrElse("lowerBound", null) +val upperBound = parameters.getOrElse("upperBound", null) +val numPartitions = parameters.getOrElse("numPartitions", null) --- End diff -- @HyukjinKwon Thanks, I did not know about this. Before I push code I was curious why JDBCOptions does not include the partitioning validation? That seems like a point of duplication also. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r66716164 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala --- @@ -96,7 +97,16 @@ private[sql] case class JDBCRelation( override val needConversion: Boolean = false - override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) + override val schema: StructType = { +val resolvedSchema = JDBCRDD.resolveTable(url, table, properties) +providedSchemaOption match { + case Some(providedSchema) => +if (providedSchema.sql.toLowerCase == resolvedSchema.sql.toLowerCase) resolvedSchema --- End diff -- I guess it would make sense if it does not try to apply the resolved schema when the schema is explicitly set like the other data sources. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r66716162 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,105 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { -val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { +val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) +val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) +val partitionColumn = parameters.getOrElse("partitionColumn", null) +val lowerBound = parameters.getOrElse("lowerBound", null) +val upperBound = parameters.getOrElse("upperBound", null) +val numPartitions = parameters.getOrElse("numPartitions", null) + +if (partitionColumn != null + && (lowerBound == null || upperBound == null || numPartitions == null)) { sys.error("Partitioning incompletely specified") } -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) -JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) +JDBCRelation(url, table, parts, properties, Option(schema))(sqlContext.sparkSession) + } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite | DropTable, CreateTable, | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val url = parameters.getOrElse("url", + sys.error("Saving jdbc source requires url to be set." + +" (ie. df.option(\"url\", \"ACTUAL_URL\")")) +val table = parameters.getOrElse("dbtable", parameters.getOrElse("table", + sys.error("Saving jdbc source requires dbtable to be set." + +" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")"))) + +import collection.JavaConverters._ +val props = new Properties() +props.putAll(parameters.asJava) +val conn = JdbcUtils.createConnectionFactory(url, props)() + +try { + val tableExists =
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r66716158 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,105 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { -val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { +val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) +val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) +val partitionColumn = parameters.getOrElse("partitionColumn", null) +val lowerBound = parameters.getOrElse("lowerBound", null) +val upperBound = parameters.getOrElse("upperBound", null) +val numPartitions = parameters.getOrElse("numPartitions", null) --- End diff -- There is a class for those options, `JDBCOptions`. It would be nicer if those options are managed in a single place. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r66716161 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala --- @@ -19,37 +19,105 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.util.Properties -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider, SchemaRelationProvider} +import org.apache.spark.sql.types.StructType -class JdbcRelationProvider extends RelationProvider with DataSourceRegister { +class JdbcRelationProvider extends CreatableRelationProvider + with SchemaRelationProvider with RelationProvider with DataSourceRegister { override def shortName(): String = "jdbc" - /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { -val jdbcOptions = new JDBCOptions(parameters) -if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null -|| jdbcOptions.upperBound == null -|| jdbcOptions.numPartitions == null)) { +createRelation(sqlContext, parameters, null) + } + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { +val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) +val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) +val partitionColumn = parameters.getOrElse("partitionColumn", null) +val lowerBound = parameters.getOrElse("lowerBound", null) +val upperBound = parameters.getOrElse("upperBound", null) +val numPartitions = parameters.getOrElse("numPartitions", null) + +if (partitionColumn != null + && (lowerBound == null || upperBound == null || numPartitions == null)) { sys.error("Partitioning incompletely specified") } -val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null -} else { +val partitionInfo = if (partitionColumn == null) null +else { JDBCPartitioningInfo( -jdbcOptions.partitionColumn, -jdbcOptions.lowerBound.toLong, -jdbcOptions.upperBound.toLong, -jdbcOptions.numPartitions.toInt) +partitionColumn, lowerBound.toLong, upperBound.toLong, numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) -JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) +JDBCRelation(url, table, parts, properties, Option(schema))(sqlContext.sparkSession) + } + + /* + * The following structure applies to this code: + * |tableExists| !tableExists + * + * Ignore | BaseRelation | CreateTable, saveTable, BaseRelation + * ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation + * Overwrite | DropTable, CreateTable, | CreateTable, saveTable, BaseRelation + * | saveTable, BaseRelation | + * Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation + */ + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +val url = parameters.getOrElse("url", + sys.error("Saving jdbc source requires url to be set." + +" (ie. df.option(\"url\", \"ACTUAL_URL\")")) +val table = parameters.getOrElse("dbtable", parameters.getOrElse("table", + sys.error("Saving jdbc source requires dbtable to be set." + +" (ie. df.option(\"dbtable\", \"ACTUAL_DB_TABLE\")"))) + +import collection.JavaConverters._ --- End diff -- I think this just can be imported at the class level rather than trying to import this for every time it creates a relation. --- If your project is set up for it, you can reply to
[GitHub] spark pull request #12601: [SPARK-14525][SQL] Make DataFrameWrite.save work ...
Github user JustinPihony commented on a diff in the pull request: https://github.com/apache/spark/pull/12601#discussion_r66009900 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala --- @@ -96,7 +97,16 @@ private[sql] case class JDBCRelation( override val needConversion: Boolean = false - override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) + override val schema: StructType = { +val resolvedSchema = JDBCRDD.resolveTable(url, table, properties) +providedSchemaOption match { + case Some(providedSchema) => +if (providedSchema.sql.toLowerCase == resolvedSchema.sql.toLowerCase) resolvedSchema --- End diff -- This is the only area I'm unsure about. I'd like a second opinion on whether this seems ok, or if I need to build something more custom for schema comparison. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org