[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15664


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-30 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94216803
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -108,14 +108,36 @@ object JdbcUtils extends Logging {
   }
 
   /**
-   * Returns a PreparedStatement that inserts a row into table via conn.
+   * Returns an Insert SQL statement for inserting a row into the target 
table via JDBC conn.
*/
-  def insertStatement(conn: Connection, table: String, rddSchema: 
StructType, dialect: JdbcDialect)
-  : PreparedStatement = {
-val columns = rddSchema.fields.map(x => 
dialect.quoteIdentifier(x.name)).mkString(",")
+  def getInsertStatement(
+  table: String,
+  rddSchema: StructType,
+  tableSchema: Option[StructType],
+  isCaseSensitive: Boolean,
+  dialect: JdbcDialect): String = {
+val columns = if (tableSchema.isEmpty) {
+  rddSchema.fields.map(x => 
dialect.quoteIdentifier(x.name)).mkString(",")
--- End diff --

The legacy behavior is used when `tableSchema` is `None`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-30 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94216756
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -67,16 +68,18 @@ class JdbcRelationProvider extends 
CreatableRelationProvider
 if (isTruncate && isCascadingTruncateTable(url) == 
Some(false)) {
   // In this case, we should truncate table and then load.
   truncateTable(conn, table)
-  saveTable(df, url, table, jdbcOptions)
+  val tableSchema = JdbcUtils.getSchemaOption(conn, url, table)
--- End diff --

I moved this into `case` statements.
Since `JdbcUtils.tableExists` is used, `getSchemaOption` can be skipped for 
the other `SaveMode`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-30 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94211157
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -57,26 +57,28 @@ class JdbcRelationProvider extends 
CreatableRelationProvider
 val table = jdbcOptions.table
 val createTableOptions = jdbcOptions.createTableOptions
 val isTruncate = jdbcOptions.isTruncate
+val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
 
 val conn = JdbcUtils.createConnectionFactory(jdbcOptions)()
 try {
   val tableExists = JdbcUtils.tableExists(conn, url, table)
   if (tableExists) {
+val tableSchema = JdbcUtils.getSchemaOption(conn, url, table)
 mode match {
   case SaveMode.Overwrite =>
 if (isTruncate && isCascadingTruncateTable(url) == 
Some(false)) {
   // In this case, we should truncate table and then load.
   truncateTable(conn, table)
-  saveTable(df, url, table, jdbcOptions)
+  saveTable(df, url, table, tableSchema.get, isCaseSensitive, 
jdbcOptions)
--- End diff --

Sure! Now, there is no pre-check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-30 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94210039
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -108,14 +108,32 @@ object JdbcUtils extends Logging {
   }
 
   /**
-   * Returns a PreparedStatement that inserts a row into table via conn.
+   * Returns an Insert SQL statement for inserting a row into the target 
table via JDBC conn.
*/
-  def insertStatement(conn: Connection, table: String, rddSchema: 
StructType, dialect: JdbcDialect)
-  : PreparedStatement = {
-val columns = rddSchema.fields.map(x => 
dialect.quoteIdentifier(x.name)).mkString(",")
+  def getInsertStatement(
+  table: String,
+  rddSchema: StructType,
+  tableSchema: StructType,
--- End diff --

Right. I see. I'll update like that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94207926
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -108,14 +108,32 @@ object JdbcUtils extends Logging {
   }
 
   /**
-   * Returns a PreparedStatement that inserts a row into table via conn.
+   * Returns an Insert SQL statement for inserting a row into the target 
table via JDBC conn.
*/
-  def insertStatement(conn: Connection, table: String, rddSchema: 
StructType, dialect: JdbcDialect)
-  : PreparedStatement = {
-val columns = rddSchema.fields.map(x => 
dialect.quoteIdentifier(x.name)).mkString(",")
+  def getInsertStatement(
+  table: String,
+  rddSchema: StructType,
+  tableSchema: StructType,
--- End diff --

If `tableSchema` is None, we should follow the original way. Please add the 
extra logics. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94207873
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -57,26 +57,28 @@ class JdbcRelationProvider extends 
CreatableRelationProvider
 val table = jdbcOptions.table
 val createTableOptions = jdbcOptions.createTableOptions
 val isTruncate = jdbcOptions.isTruncate
+val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
 
 val conn = JdbcUtils.createConnectionFactory(jdbcOptions)()
 try {
   val tableExists = JdbcUtils.tableExists(conn, url, table)
   if (tableExists) {
+val tableSchema = JdbcUtils.getSchemaOption(conn, url, table)
 mode match {
   case SaveMode.Overwrite =>
 if (isTruncate && isCascadingTruncateTable(url) == 
Some(false)) {
   // In this case, we should truncate table and then load.
   truncateTable(conn, table)
-  saveTable(df, url, table, jdbcOptions)
+  saveTable(df, url, table, tableSchema.get, isCaseSensitive, 
jdbcOptions)
--- End diff --

We need to pass `tableSchema`. It is not safe to do `tableSchema.get`. : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-29 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94177466
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+Some(getSchema(statement.executeQuery(), dialect))
+  } catch {
+case _: SQLException => None
+  } finally {
+statement.close()
+  }
+} catch {
+  case _: SQLException => None
+}
+  }
+
+  /**
+   * Returns a schema using rddSchema's column sequence and tableSchema's 
column names.
+   *
+   * When appending data into some case-sensitive DBMSs like 
PostgreSQL/Oracle, we need to respect
+   * the existing case-sensitive column names instead of RDD column names 
for user convenience.
+   * See SPARK-18123 for more details.
+   */
+  def normalizeSchema(
+  rddSchema: StructType,
+  tableSchema: StructType,
+  caseSensitive: Boolean): StructType = {
+val nameMap = tableSchema.fields.map(f => f.name -> f).toMap
+val lowercaseNameMap = tableSchema.fields.map(f => f.name.toLowerCase 
-> f).toMap
+
+var schema = new StructType()
+rddSchema.fields.foreach { f =>
+  if (nameMap.isDefinedAt(f.name)) {
+// identical names
+schema = schema.add(nameMap(f.name))
+  } else if (!caseSensitive && 
lowercaseNameMap.isDefinedAt(f.name.toLowerCase)) {
+// identical names in a case-insensitive way
+schema = schema.add(lowercaseNameMap(f.name.toLowerCase))
+  } else {
+throw new AnalysisException(s"""Column "${f.name}" not found""")
--- End diff --

Oh, it's already in @gatorsmile 's patch.
```
throw new AnalysisException(s"""Column "${col.name}" not found in 
schema $tableSchema""")
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-29 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94177293
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+Some(getSchema(statement.executeQuery(), dialect))
+  } catch {
+case _: SQLException => None
+  } finally {
+statement.close()
+  }
+} catch {
+  case _: SQLException => None
+}
+  }
+
+  /**
+   * Returns a schema using rddSchema's column sequence and tableSchema's 
column names.
+   *
+   * When appending data into some case-sensitive DBMSs like 
PostgreSQL/Oracle, we need to respect
+   * the existing case-sensitive column names instead of RDD column names 
for user convenience.
+   * See SPARK-18123 for more details.
+   */
+  def normalizeSchema(
+  rddSchema: StructType,
+  tableSchema: StructType,
+  caseSensitive: Boolean): StructType = {
+val nameMap = tableSchema.fields.map(f => f.name -> f).toMap
+val lowercaseNameMap = tableSchema.fields.map(f => f.name.toLowerCase 
-> f).toMap
+
+var schema = new StructType()
+rddSchema.fields.foreach { f =>
+  if (nameMap.isDefinedAt(f.name)) {
+// identical names
+schema = schema.add(nameMap(f.name))
+  } else if (!caseSensitive && 
lowercaseNameMap.isDefinedAt(f.name.toLowerCase)) {
+// identical names in a case-insensitive way
+schema = schema.add(lowercaseNameMap(f.name.toLowerCase))
+  } else {
+throw new AnalysisException(s"""Column "${f.name}" not found""")
--- End diff --

It may affect some other test cases, but I'll try!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-29 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94175664
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -568,10 +617,9 @@ object JdbcUtils extends Logging {
 conn.setAutoCommit(false) // Everything in the same db transaction.
 conn.setTransactionIsolation(finalIsolationLevel)
   }
-  val stmt = insertStatement(conn, table, rddSchema, dialect)
-  val setters: Array[JDBCValueSetter] = 
rddSchema.fields.map(_.dataType)
-.map(makeSetter(conn, dialect, _)).toArray
-  val numFields = rddSchema.fields.length
+  val stmt = insertStatement(conn, table, schema, dialect)
--- End diff --

Yep. That looks much better than the current! Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94101576
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+Some(getSchema(statement.executeQuery(), dialect))
--- End diff --

Then, we can keep the existing way. See 
https://github.com/apache/spark/compare/master...gatorsmile:pr-15664Changed1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94099067
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+Some(getSchema(statement.executeQuery(), dialect))
--- End diff --

yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94099004
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+Some(getSchema(statement.executeQuery(), dialect))
--- End diff --

For unsupported types, it will throw an `SQLException`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94098773
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+Some(getSchema(statement.executeQuery(), dialect))
+  } catch {
+case _: SQLException => None
+  } finally {
+statement.close()
+  }
+} catch {
+  case _: SQLException => None
+}
+  }
+
+  /**
+   * Returns a schema using rddSchema's column sequence and tableSchema's 
column names.
+   *
+   * When appending data into some case-sensitive DBMSs like 
PostgreSQL/Oracle, we need to respect
+   * the existing case-sensitive column names instead of RDD column names 
for user convenience.
+   * See SPARK-18123 for more details.
+   */
+  def normalizeSchema(
+  rddSchema: StructType,
+  tableSchema: StructType,
+  caseSensitive: Boolean): StructType = {
+val nameMap = tableSchema.fields.map(f => f.name -> f).toMap
+val lowercaseNameMap = tableSchema.fields.map(f => f.name.toLowerCase 
-> f).toMap
+
+var schema = new StructType()
+rddSchema.fields.foreach { f =>
+  if (nameMap.isDefinedAt(f.name)) {
+// identical names
+schema = schema.add(nameMap(f.name))
+  } else if (!caseSensitive && 
lowercaseNameMap.isDefinedAt(f.name.toLowerCase)) {
+// identical names in a case-insensitive way
+schema = schema.add(lowercaseNameMap(f.name.toLowerCase))
+  } else {
+throw new AnalysisException(s"""Column "${f.name}" not found""")
--- End diff --

The error message looks ambiguous. Maybe `Column "${f.name}" in RDD not 
found in table schema`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94098761
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -568,10 +617,9 @@ object JdbcUtils extends Logging {
 conn.setAutoCommit(false) // Everything in the same db transaction.
 conn.setTransactionIsolation(finalIsolationLevel)
   }
-  val stmt = insertStatement(conn, table, rddSchema, dialect)
-  val setters: Array[JDBCValueSetter] = 
rddSchema.fields.map(_.dataType)
-.map(makeSetter(conn, dialect, _)).toArray
-  val numFields = rddSchema.fields.length
+  val stmt = insertStatement(conn, table, schema, dialect)
--- End diff --

This is my rough idea: 
https://github.com/apache/spark/compare/master...gatorsmile:pr-15664Changed1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94098595
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+Some(getSchema(statement.executeQuery(), dialect))
--- End diff --

`getSchema` will throw an exception when the schema contains an unsupported 
type. Now we use it to check if the table exists. Does it change current 
behavior? E.g., the insertion working before now fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94095175
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -568,10 +617,9 @@ object JdbcUtils extends Logging {
 conn.setAutoCommit(false) // Everything in the same db transaction.
 conn.setTransactionIsolation(finalIsolationLevel)
   }
-  val stmt = insertStatement(conn, table, rddSchema, dialect)
-  val setters: Array[JDBCValueSetter] = 
rddSchema.fields.map(_.dataType)
-.map(makeSetter(conn, dialect, _)).toArray
-  val numFields = rddSchema.fields.length
+  val stmt = insertStatement(conn, table, schema, dialect)
--- End diff --

We need to think about what is a better API interface of `savePartition`. 
Let me submit another commit and show you my rough idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94095014
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -568,10 +617,9 @@ object JdbcUtils extends Logging {
 conn.setAutoCommit(false) // Everything in the same db transaction.
 conn.setTransactionIsolation(finalIsolationLevel)
   }
-  val stmt = insertStatement(conn, table, rddSchema, dialect)
-  val setters: Array[JDBCValueSetter] = 
rddSchema.fields.map(_.dataType)
-.map(makeSetter(conn, dialect, _)).toArray
-  val numFields = rddSchema.fields.length
+  val stmt = insertStatement(conn, table, schema, dialect)
--- End diff --

Ur, `target/table schema` has a different column order than `rdd` order. 
Why do we need to use `table` schema? Actually, we need table column names only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94094859
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -568,10 +617,9 @@ object JdbcUtils extends Logging {
 conn.setAutoCommit(false) // Everything in the same db transaction.
 conn.setTransactionIsolation(finalIsolationLevel)
   }
-  val stmt = insertStatement(conn, table, rddSchema, dialect)
-  val setters: Array[JDBCValueSetter] = 
rddSchema.fields.map(_.dataType)
-.map(makeSetter(conn, dialect, _)).toArray
-  val numFields = rddSchema.fields.length
+  val stmt = insertStatement(conn, table, schema, dialect)
--- End diff --

I see.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94094799
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -568,10 +617,9 @@ object JdbcUtils extends Logging {
 conn.setAutoCommit(false) // Everything in the same db transaction.
 conn.setTransactionIsolation(finalIsolationLevel)
   }
-  val stmt = insertStatement(conn, table, rddSchema, dialect)
-  val setters: Array[JDBCValueSetter] = 
rddSchema.fields.map(_.dataType)
-.map(makeSetter(conn, dialect, _)).toArray
-  val numFields = rddSchema.fields.length
+  val stmt = insertStatement(conn, table, schema, dialect)
--- End diff --

See the comment in 
https://github.com/apache/spark/pull/15664#discussion_r94094625


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94094789
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+Some(getSchema(statement.executeQuery(), dialect))
+  } catch {
+case _: SQLException => None
+  } finally {
+statement.close()
+  }
+} catch {
+  case _: SQLException => None
+}
+  }
+
+  /**
+   * Returns a schema using rddSchema's column sequence and tableSchema's 
column names.
+   *
+   * When appending data into some case-sensitive DBMSs like 
PostgreSQL/Oracle, we need to respect
+   * the existing case-sensitive column names instead of RDD column names 
for user convenience.
+   * See SPARK-18123 for more details.
+   */
+  def normalizeSchema(
+  rddSchema: StructType,
+  tableSchema: StructType,
+  caseSensitive: Boolean): StructType = {
--- End diff --

Then, do you want to create `rddSchema` StructField with tableSchema's 
column name (if not matched)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94094718
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -568,10 +617,9 @@ object JdbcUtils extends Logging {
 conn.setAutoCommit(false) // Everything in the same db transaction.
 conn.setTransactionIsolation(finalIsolationLevel)
   }
-  val stmt = insertStatement(conn, table, rddSchema, dialect)
-  val setters: Array[JDBCValueSetter] = 
rddSchema.fields.map(_.dataType)
-.map(makeSetter(conn, dialect, _)).toArray
-  val numFields = rddSchema.fields.length
+  val stmt = insertStatement(conn, table, schema, dialect)
--- End diff --

`insertStatement` should use the target/table schema; the other places in 
this function should use the source/rdd schema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94094625
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+Some(getSchema(statement.executeQuery(), dialect))
+  } catch {
+case _: SQLException => None
+  } finally {
+statement.close()
+  }
+} catch {
+  case _: SQLException => None
+}
+  }
+
+  /**
+   * Returns a schema using rddSchema's column sequence and tableSchema's 
column names.
+   *
+   * When appending data into some case-sensitive DBMSs like 
PostgreSQL/Oracle, we need to respect
+   * the existing case-sensitive column names instead of RDD column names 
for user convenience.
+   * See SPARK-18123 for more details.
+   */
+  def normalizeSchema(
+  rddSchema: StructType,
+  tableSchema: StructType,
+  caseSensitive: Boolean): StructType = {
--- End diff --

After rethinking about it, the result of this function `normalizeSchema` is 
a mixed of two schemas (source/rdd schema + target/table schema). We should not 
mix them together. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94094524
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -568,10 +617,10 @@ object JdbcUtils extends Logging {
 conn.setAutoCommit(false) // Everything in the same db transaction.
 conn.setTransactionIsolation(finalIsolationLevel)
   }
-  val stmt = insertStatement(conn, table, rddSchema, dialect)
-  val setters: Array[JDBCValueSetter] = 
rddSchema.fields.map(_.dataType)
+  val stmt = insertStatement(conn, table, schema, dialect)
+  val setters: Array[JDBCValueSetter] = schema.fields.map(_.dataType)
 .map(makeSetter(conn, dialect, _)).toArray
--- End diff --

Sure!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94093997
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -568,10 +617,10 @@ object JdbcUtils extends Logging {
 conn.setAutoCommit(false) // Everything in the same db transaction.
 conn.setTransactionIsolation(finalIsolationLevel)
   }
-  val stmt = insertStatement(conn, table, rddSchema, dialect)
-  val setters: Array[JDBCValueSetter] = 
rddSchema.fields.map(_.dataType)
+  val stmt = insertStatement(conn, table, schema, dialect)
+  val setters: Array[JDBCValueSetter] = schema.fields.map(_.dataType)
 .map(makeSetter(conn, dialect, _)).toArray
--- End diff --

Could you just simplify it? 
```Scala
val setters = schema.fields.map(f => makeSetter(conn, dialect, f.dataType))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94093301
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
--- End diff --

JDBC data sources might detect whether the table exists when preparing the 
SQL statement. Thus, please keep it. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94079568
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
--- End diff --

Is it better not to hide that here? If then, I'll remove that `try`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94079276
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
--- End diff --

Yes. It's due to `PreparedStatement prepareStatement(String sql) throws 
SQLException`. I want to make it sure to return `None` instead of raising 
SQLException at this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94063678
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
--- End diff --

What is the reason why this should be included in the try block?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94063172
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,55 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+Some(getSchema(statement.executeQuery(), dialect))
+  } catch {
+case _: SQLException => None
+  } finally {
+statement.close()
+  }
+} catch {
+  case _: SQLException => None
+}
+  }
+
+  /**
+   * Returns a schema using rddSchema's column sequence and tableSchema's 
column names.
+   *
+   * When appending data into some case-sensitive DBMSs like 
PostgreSQL/Oracle, we need to respect
+   * the existing case-sensitive column names instead of RDD column names 
for user convenience.
+   * See SPARK-18123 for more details.
+   */
+  def normalizeSchema(
+  rddSchema: StructType,
+  tableSchema: StructType,
+  caseSensitive: Boolean): StructType = {
--- End diff --

`isCaseSensitive`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94007008
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -60,23 +60,27 @@ class JdbcRelationProvider extends 
CreatableRelationProvider
 
 val conn = JdbcUtils.createConnectionFactory(jdbcOptions)()
 try {
-  val tableExists = JdbcUtils.tableExists(conn, url, table)
+  val tableSchema = JdbcUtils.getSchema(conn, url, table)
+  val tableExists = tableSchema.isDefined
+  val caseSensitive = sqlContext.conf.caseSensitiveAnalysis
   if (tableExists) {
 mode match {
   case SaveMode.Overwrite =>
-if (isTruncate && isCascadingTruncateTable(url) == 
Some(false)) {
+val savingSchema = if (isTruncate && 
isCascadingTruncateTable(url) == Some(false)) {
   // In this case, we should truncate table and then load.
   truncateTable(conn, table)
-  saveTable(df, url, table, jdbcOptions)
+  JdbcUtils.getSavingSchema(df.schema, tableSchema.get, 
caseSensitive)
 } else {
   // Otherwise, do not truncate the table, instead drop and 
recreate it
   dropTable(conn, table)
   createTable(df.schema, url, table, createTableOptions, conn)
-  saveTable(df, url, table, jdbcOptions)
+  df.schema
 }
+saveTable(df, url, table, savingSchema, jdbcOptions)
--- End diff --

That could be. But, to do that, `JdbcUtil.saveTable` need to understand 
`SaveMode`, too. Is it okay?
Currently, `JdbcUtil` only provides somewhat primitive APIs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94004565
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,52 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+Try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+getSchema(statement.executeQuery(), dialect)
+  } finally {
+statement.close()
+  }
+} match {
+  case Success(v) =>
+Some(v)
+  case Failure(e) =>
+None
+}
+  }
+
+  /**
+   * Returns the saving schema using rddSchema's sequence and 
tableSchema's name.
+   */
+  def getSavingSchema(
+  rddSchema: StructType,
+  tableSchema: StructType,
+  caseSensitive: Boolean): StructType = {
+val nameMap = tableSchema.fields.map(f => f.name -> f).toMap
+val lowercaseNameMap = tableSchema.fields.map(f => f.name.toLowerCase 
-> f).toMap
+
+var schema = new StructType()
+rddSchema.fields.foreach { f =>
+  if (nameMap.isDefinedAt(f.name)) {
+// identical names
+schema = schema.add(nameMap(f.name))
+  } else if (!caseSensitive && 
lowercaseNameMap.isDefinedAt(f.name.toLowerCase)) {
+// case-insensitive identical names
+schema = schema.add(lowercaseNameMap(f.name.toLowerCase))
+  } else {
+throw new org.apache.spark.SparkException(s"""Column "${f.name}" 
not found""")
--- End diff --

Sure!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94004348
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,52 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+Try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+getSchema(statement.executeQuery(), dialect)
+  } finally {
+statement.close()
+  }
+} match {
+  case Success(v) =>
+Some(v)
+  case Failure(e) =>
+None
+}
+  }
+
+  /**
+   * Returns the saving schema using rddSchema's sequence and 
tableSchema's name.
+   */
+  def getSavingSchema(
+  rddSchema: StructType,
+  tableSchema: StructType,
+  caseSensitive: Boolean): StructType = {
+val nameMap = tableSchema.fields.map(f => f.name -> f).toMap
+val lowercaseNameMap = tableSchema.fields.map(f => f.name.toLowerCase 
-> f).toMap
+
+var schema = new StructType()
+rddSchema.fields.foreach { f =>
+  if (nameMap.isDefinedAt(f.name)) {
+// identical names
+schema = schema.add(nameMap(f.name))
+  } else if (!caseSensitive && 
lowercaseNameMap.isDefinedAt(f.name.toLowerCase)) {
+// case-insensitive identical names
--- End diff --

My bad. I meant `case-insensitively identical names`. I'll revise this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94004299
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,52 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+Try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+getSchema(statement.executeQuery(), dialect)
+  } finally {
+statement.close()
+  }
+} match {
+  case Success(v) =>
+Some(v)
+  case Failure(e) =>
+None
+}
+  }
+
+  /**
+   * Returns the saving schema using rddSchema's sequence and 
tableSchema's name.
--- End diff --

Yep. I'll add more correct details here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-28 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94004271
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,52 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+Try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+getSchema(statement.executeQuery(), dialect)
+  } finally {
+statement.close()
+  }
+} match {
+  case Success(v) =>
+Some(v)
+  case Failure(e) =>
+None
+}
--- End diff --

I see. Thank you for review! I wrote that by keeping the same logic in 
[tableExists](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L66-L80
) because I thought the guideline is about using return `Try` as a return 
value before this PR. 

Sure, I'll remove the usage of `Try/Success/Failure`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94001020
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -60,23 +60,27 @@ class JdbcRelationProvider extends 
CreatableRelationProvider
 
 val conn = JdbcUtils.createConnectionFactory(jdbcOptions)()
 try {
-  val tableExists = JdbcUtils.tableExists(conn, url, table)
+  val tableSchema = JdbcUtils.getSchema(conn, url, table)
+  val tableExists = tableSchema.isDefined
+  val caseSensitive = sqlContext.conf.caseSensitiveAnalysis
   if (tableExists) {
 mode match {
   case SaveMode.Overwrite =>
-if (isTruncate && isCascadingTruncateTable(url) == 
Some(false)) {
+val savingSchema = if (isTruncate && 
isCascadingTruncateTable(url) == Some(false)) {
   // In this case, we should truncate table and then load.
   truncateTable(conn, table)
-  saveTable(df, url, table, jdbcOptions)
+  JdbcUtils.getSavingSchema(df.schema, tableSchema.get, 
caseSensitive)
 } else {
   // Otherwise, do not truncate the table, instead drop and 
recreate it
   dropTable(conn, table)
   createTable(df.schema, url, table, createTableOptions, conn)
-  saveTable(df, url, table, jdbcOptions)
+  df.schema
 }
+saveTable(df, url, table, savingSchema, jdbcOptions)
--- End diff --

How about passing the table schema and resolve the schema inside 
`saveTable`? It might simplify the codes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94000876
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,52 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+Try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+getSchema(statement.executeQuery(), dialect)
+  } finally {
+statement.close()
+  }
+} match {
+  case Success(v) =>
+Some(v)
+  case Failure(e) =>
+None
+}
+  }
+
+  /**
+   * Returns the saving schema using rddSchema's sequence and 
tableSchema's name.
--- End diff --

Here, we need to explain why we need to use the column sequences in 
`rddSchema` and why we need to use the column names in `tableSchema`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94000781
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,52 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+Try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+getSchema(statement.executeQuery(), dialect)
+  } finally {
+statement.close()
+  }
+} match {
+  case Success(v) =>
+Some(v)
+  case Failure(e) =>
+None
+}
+  }
+
+  /**
+   * Returns the saving schema using rddSchema's sequence and 
tableSchema's name.
+   */
+  def getSavingSchema(
+  rddSchema: StructType,
+  tableSchema: StructType,
+  caseSensitive: Boolean): StructType = {
+val nameMap = tableSchema.fields.map(f => f.name -> f).toMap
+val lowercaseNameMap = tableSchema.fields.map(f => f.name.toLowerCase 
-> f).toMap
+
+var schema = new StructType()
+rddSchema.fields.foreach { f =>
+  if (nameMap.isDefinedAt(f.name)) {
+// identical names
+schema = schema.add(nameMap(f.name))
+  } else if (!caseSensitive && 
lowercaseNameMap.isDefinedAt(f.name.toLowerCase)) {
+// case-insensitive identical names
+schema = schema.add(lowercaseNameMap(f.name.toLowerCase))
+  } else {
+throw new org.apache.spark.SparkException(s"""Column "${f.name}" 
not found""")
--- End diff --

`org.apache.spark.SparkException ` -> `AnalysisException`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94000757
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,52 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+Try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+getSchema(statement.executeQuery(), dialect)
+  } finally {
+statement.close()
+  }
+} match {
+  case Success(v) =>
+Some(v)
+  case Failure(e) =>
+None
+}
+  }
+
+  /**
+   * Returns the saving schema using rddSchema's sequence and 
tableSchema's name.
+   */
+  def getSavingSchema(
+  rddSchema: StructType,
+  tableSchema: StructType,
+  caseSensitive: Boolean): StructType = {
+val nameMap = tableSchema.fields.map(f => f.name -> f).toMap
+val lowercaseNameMap = tableSchema.fields.map(f => f.name.toLowerCase 
-> f).toMap
+
+var schema = new StructType()
+rddSchema.fields.foreach { f =>
+  if (nameMap.isDefinedAt(f.name)) {
+// identical names
+schema = schema.add(nameMap(f.name))
+  } else if (!caseSensitive && 
lowercaseNameMap.isDefinedAt(f.name.toLowerCase)) {
+// case-insensitive identical names
--- End diff --

Need to improve the comments. Actually, we return case sensitive column 
names. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r94000523
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,52 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+Try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+getSchema(statement.executeQuery(), dialect)
+  } finally {
+statement.close()
+  }
+} match {
+  case Success(v) =>
+Some(v)
+  case Failure(e) =>
+None
+}
+  }
+
+  /**
+   * Returns the saving schema using rddSchema's sequence and 
tableSchema's name.
--- End diff --

`saving schema` is not right. We need a better name here. 

`rddSchema's sequence and tableSchema's name` -> `rddSchema's column 
sequence and tableSchema's column names`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r93999899
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala
 ---
@@ -60,23 +60,27 @@ class JdbcRelationProvider extends 
CreatableRelationProvider
 
 val conn = JdbcUtils.createConnectionFactory(jdbcOptions)()
 try {
-  val tableExists = JdbcUtils.tableExists(conn, url, table)
+  val tableSchema = JdbcUtils.getSchema(conn, url, table)
+  val tableExists = tableSchema.isDefined
+  val caseSensitive = sqlContext.conf.caseSensitiveAnalysis
   if (tableExists) {
 mode match {
   case SaveMode.Overwrite =>
-if (isTruncate && isCascadingTruncateTable(url) == 
Some(false)) {
+val savingSchema = if (isTruncate && 
isCascadingTruncateTable(url) == Some(false)) {
   // In this case, we should truncate table and then load.
   truncateTable(conn, table)
-  saveTable(df, url, table, jdbcOptions)
+  JdbcUtils.getSavingSchema(df.schema, tableSchema.get, 
caseSensitive)
--- End diff --

Based on your current implementation, the `tableSchema` could be None, 
right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r93999377
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -211,6 +211,52 @@ object JdbcUtils extends Logging {
   }
 
   /**
+   * Returns the schema if the table already exists in the JDBC database.
+   */
+  def getSchema(conn: Connection, url: String, table: String): 
Option[StructType] = {
+val dialect = JdbcDialects.get(url)
+
+Try {
+  val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
+  try {
+getSchema(statement.executeQuery(), dialect)
+  } finally {
+statement.close()
+  }
+} match {
+  case Success(v) =>
+Some(v)
+  case Failure(e) =>
+None
+}
--- End diff --

Please do not use Try/Success/Failure. 
https://github.com/databricks/scala-style-guide#exception-handling-try-vs-try




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r93864198
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -112,7 +112,25 @@ object JdbcUtils extends Logging {
*/
   def insertStatement(conn: Connection, table: String, rddSchema: 
StructType, dialect: JdbcDialect)
   : PreparedStatement = {
-val columns = rddSchema.fields.map(x => 
dialect.quoteIdentifier(x.name)).mkString(",")
+// Use database column names instead of RDD schema column names
+val tableSchemaQuery = 
conn.prepareStatement(dialect.getSchemaQuery(table))
+var columns: String = ""
+try {
+  val tableSchema = getSchema(tableSchemaQuery.executeQuery(), dialect)
+  val nameMap = tableSchema.fields.map(f => f.name -> f.name).toMap
+  val lowercaseNameMap = tableSchema.fields.map(f => 
f.name.toLowerCase -> f.name).toMap
+  columns = rddSchema.fields.map { x =>
+if (nameMap.isDefinedAt(x.name)) {
+  dialect.quoteIdentifier(x.name)
+} else if (lowercaseNameMap.isDefinedAt(x.name.toLowerCase)) {
+  dialect.quoteIdentifier(lowercaseNameMap(x.name.toLowerCase))
+} else {
+  throw new SQLException(s"""Column "${x.name}" not found""")
+}
--- End diff --

Yep. I'll fix that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r93859518
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -112,7 +112,25 @@ object JdbcUtils extends Logging {
*/
   def insertStatement(conn: Connection, table: String, rddSchema: 
StructType, dialect: JdbcDialect)
   : PreparedStatement = {
-val columns = rddSchema.fields.map(x => 
dialect.quoteIdentifier(x.name)).mkString(",")
+// Use database column names instead of RDD schema column names
+val tableSchemaQuery = 
conn.prepareStatement(dialect.getSchemaQuery(table))
--- End diff --

Thank you for review, @gatorsmile .
Yes. That looks great! We can use `getSchemaQuery` instead of `tableExist`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-25 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r93847101
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -112,7 +112,25 @@ object JdbcUtils extends Logging {
*/
   def insertStatement(conn: Connection, table: String, rddSchema: 
StructType, dialect: JdbcDialect)
   : PreparedStatement = {
-val columns = rddSchema.fields.map(x => 
dialect.quoteIdentifier(x.name)).mkString(",")
+// Use database column names instead of RDD schema column names
+val tableSchemaQuery = 
conn.prepareStatement(dialect.getSchemaQuery(table))
+var columns: String = ""
+try {
+  val tableSchema = getSchema(tableSchemaQuery.executeQuery(), dialect)
+  val nameMap = tableSchema.fields.map(f => f.name -> f.name).toMap
+  val lowercaseNameMap = tableSchema.fields.map(f => 
f.name.toLowerCase -> f.name).toMap
+  columns = rddSchema.fields.map { x =>
+if (nameMap.isDefinedAt(x.name)) {
+  dialect.quoteIdentifier(x.name)
+} else if (lowercaseNameMap.isDefinedAt(x.name.toLowerCase)) {
+  dialect.quoteIdentifier(lowercaseNameMap(x.name.toLowerCase))
+} else {
+  throw new SQLException(s"""Column "${x.name}" not found""")
+}
+  }.mkString(",")
+} finally {
+  tableSchemaQuery.close()
+}
 val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
 val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
 conn.prepareStatement(sql)
--- End diff --

Can we build the INSERT SQL statement in `saveTable` based on the schema? 
No need to prepare the generated statement in `saveTable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-25 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r93846787
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -112,7 +112,25 @@ object JdbcUtils extends Logging {
*/
   def insertStatement(conn: Connection, table: String, rddSchema: 
StructType, dialect: JdbcDialect)
   : PreparedStatement = {
-val columns = rddSchema.fields.map(x => 
dialect.quoteIdentifier(x.name)).mkString(",")
+// Use database column names instead of RDD schema column names
+val tableSchemaQuery = 
conn.prepareStatement(dialect.getSchemaQuery(table))
+var columns: String = ""
+try {
+  val tableSchema = getSchema(tableSchemaQuery.executeQuery(), dialect)
+  val nameMap = tableSchema.fields.map(f => f.name -> f.name).toMap
+  val lowercaseNameMap = tableSchema.fields.map(f => 
f.name.toLowerCase -> f.name).toMap
+  columns = rddSchema.fields.map { x =>
+if (nameMap.isDefinedAt(x.name)) {
+  dialect.quoteIdentifier(x.name)
+} else if (lowercaseNameMap.isDefinedAt(x.name.toLowerCase)) {
+  dialect.quoteIdentifier(lowercaseNameMap(x.name.toLowerCase))
+} else {
+  throw new SQLException(s"""Column "${x.name}" not found""")
+}
--- End diff --

The name resolution should be still controlled by 
`spark.sql.caseSensitive`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-25 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r93844970
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -112,7 +112,25 @@ object JdbcUtils extends Logging {
*/
   def insertStatement(conn: Connection, table: String, rddSchema: 
StructType, dialect: JdbcDialect)
   : PreparedStatement = {
-val columns = rddSchema.fields.map(x => 
dialect.quoteIdentifier(x.name)).mkString(",")
+// Use database column names instead of RDD schema column names
+val tableSchemaQuery = 
conn.prepareStatement(dialect.getSchemaQuery(table))
--- End diff --

We can get the table schema [when we checking whether the table 
exists](https://github.com/apache/spark/blob/fb07bbe575aabe68422fd3a31865101fb7fa1722/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala#L63).
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-07 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r91287287
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -112,7 +112,25 @@ object JdbcUtils extends Logging {
*/
   def insertStatement(conn: Connection, table: String, rddSchema: 
StructType, dialect: JdbcDialect)
   : PreparedStatement = {
-val columns = rddSchema.fields.map(x => 
dialect.quoteIdentifier(x.name)).mkString(",")
+// Use database column names instead of RDD schema column names
+val tableSchemaQuery = 
conn.prepareStatement(dialect.getSchemaQuery(table))
--- End diff --

Thank you for review, @viirya . I'll try to update like that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r90826780
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -112,7 +112,25 @@ object JdbcUtils extends Logging {
*/
   def insertStatement(conn: Connection, table: String, rddSchema: 
StructType, dialect: JdbcDialect)
   : PreparedStatement = {
-val columns = rddSchema.fields.map(x => 
dialect.quoteIdentifier(x.name)).mkString(",")
+// Use database column names instead of RDD schema column names
+val tableSchemaQuery = 
conn.prepareStatement(dialect.getSchemaQuery(table))
--- End diff --

I think the table schema won't change during inserting all data. You ask 
table schema for every insert statement now. Can we do this once in caller side 
(i.e., `savePartition`) and reuse the schema then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-12-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r90799712
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -112,7 +112,25 @@ object JdbcUtils extends Logging {
*/
   def insertStatement(conn: Connection, table: String, rddSchema: 
StructType, dialect: JdbcDialect)
   : PreparedStatement = {
-val columns = rddSchema.fields.map(x => 
dialect.quoteIdentifier(x.name)).mkString(",")
+// Use database column names instead of RDD schema column names
+val tableSchemaQuery = 
conn.prepareStatement(dialect.getSchemaQuery(table))
+var columns: String = ""
+try {
+  val tableSchema = getSchema(tableSchemaQuery.executeQuery(), dialect)
+  val nameMap = tableSchema.fields.map(f => f.name -> f.name).toMap
+  val lowercaseNameMap = tableSchema.fields.map(f => 
f.name.toLowerCase -> f.name).toMap
+  columns = rddSchema.fields.map { x =>
+if (nameMap.isDefinedAt(x.name)) {
+  dialect.quoteIdentifier(x.name)
+} else if (lowercaseNameMap.isDefinedAt(x.name.toLowerCase)) {
+  dialect.quoteIdentifier(nameMap(x.name.toLowerCase))
--- End diff --

nameMap -> lowercaseNameMap.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-10-28 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/15664#discussion_r85616485
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -112,7 +112,25 @@ object JdbcUtils extends Logging {
*/
   def insertStatement(conn: Connection, table: String, rddSchema: 
StructType, dialect: JdbcDialect)
   : PreparedStatement = {
--- End diff --

Actually, this is an approach similar to `normalizePartitionSpec` in 
`PartitioningUtils.scala`.


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala#L248-L251


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15664: [SPARK-18123][SQL] Use db column names instead of...

2016-10-27 Thread dongjoon-hyun
GitHub user dongjoon-hyun opened a pull request:

https://github.com/apache/spark/pull/15664

[SPARK-18123][SQL] Use db column names instead of RDD column ones during 
JDBC Writing

## What changes were proposed in this pull request?

Apache Spark supports the following cases **by quoting RDD column names** 
while saving through JDBC.
* Allow reserved keyword as a column name, e.g., 'order'.
* Allow mixed-case colume names like the following, e.g., `[a: int, A: 
int]`.

  ```scala
scala> val df = sql("select 1 a, 1 A")
df: org.apache.spark.sql.DataFrame = [a: int, A: int]
scala> val option = Map("url" -> "jdbc:postgresql:postgres", "dbtable" -> 
"mixed", "user" -> "postgres", "password" -> "test")
scala> df.write.mode("overwrite").format("jdbc").options(option).save()
scala> df.write.mode("append").format("jdbc").options(option).save()
```

This PR aims to use database column names instead of RDD column ones in 
order to support the following additionally.
Note that this case succeeds with `MySQL`, but fails on `Postgres`/`Oracle` 
before.

```scala
val df1 = sql("select 1 a")
val df2 = sql("select 1 A")
...
df1.write.mode("overwrite").format("jdbc").options(option).save()
df2.write.mode("append").format("jdbc").options(option).save()
```

## How was this patch tested?

Pass the Jenkins test with a new testcase.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dongjoon-hyun/spark SPARK-18123

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15664.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15664


commit 9558f96eb8d66ed89b2b507e81a285f710c82262
Author: Dongjoon Hyun 
Date:   2016-10-27T21:30:58Z

[SPARK-18123][SQL] Use database column names instead of RDD schema column 
names




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org