This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6994bad5e6e [SPARK-44262][SQL] Add `dropTable` and 
`getInsertStatement` to JdbcDialect
6994bad5e6e is described below

commit 6994bad5e6ea8700d48cbe20e9b406b89925adc7
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Mon Oct 16 13:55:24 2023 +0500

    [SPARK-44262][SQL] Add `dropTable` and `getInsertStatement` to JdbcDialect
    
    ### What changes were proposed in this pull request?
    1. This PR add `dropTable` function to `JdbcDialect`. So user can override 
dropTable SQL by other JdbcDialect like Neo4J
    Neo4J Drop case
    ```sql
    MATCH (m:Person {name: 'Mark'})
    DELETE m
    ```
    2. Also add `getInsertStatement` for same reason.
    Neo4J Insert case
    ```sql
    MATCH (p:Person {name: 'Jennifer'})
    SET p.birthdate = date('1980-01-01')
    RETURN p
    ```
    Neo4J SQL(in fact named `CQL`) not like normal SQL, but it have JDBC driver.
    
    ### Why are the changes needed?
    Make JdbcDialect more useful
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    exist test
    
    Closes #41855 from Hisoka-X/SPARK-44262_JDBCUtils_improve.
    
    Authored-by: Jia Fan <fanjiaemi...@qq.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 14 +++++------
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   | 29 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index fb9e11df188..f2b84810175 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -78,7 +78,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
    * Drops a table from the JDBC database.
    */
   def dropTable(conn: Connection, table: String, options: JDBCOptions): Unit = 
{
-    executeStatement(conn, options, s"DROP TABLE $table")
+    val dialect = JdbcDialects.get(options.url)
+    executeStatement(conn, options, dialect.dropTable(table))
   }
 
   /**
@@ -114,22 +115,19 @@ object JdbcUtils extends Logging with SQLConfHelper {
       isCaseSensitive: Boolean,
       dialect: JdbcDialect): String = {
     val columns = if (tableSchema.isEmpty) {
-      rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
+      rddSchema.fields
     } else {
       // The generated insert statement needs to follow rddSchema's column 
sequence and
       // tableSchema's column names. When appending data into some 
case-sensitive DBMSs like
       // PostgreSQL/Oracle, we need to respect the existing case-sensitive 
column names instead of
       // RDD column names for user convenience.
-      val tableColumnNames = tableSchema.get.fieldNames
       rddSchema.fields.map { col =>
-        val normalizedName = tableColumnNames.find(f => conf.resolver(f, 
col.name)).getOrElse {
+        tableSchema.get.find(f => conf.resolver(f.name, col.name)).getOrElse {
           throw QueryCompilationErrors.columnNotFoundInSchemaError(col, 
tableSchema)
         }
-        dialect.quoteIdentifier(normalizedName)
-      }.mkString(",")
+      }
     }
-    val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
-    s"INSERT INTO $table ($columns) VALUES ($placeholders)"
+    dialect.insertIntoTable(table, columns)
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 22625523a04..37c378c294c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -193,6 +193,24 @@ abstract class JdbcDialect extends Serializable with 
Logging {
     statement.executeUpdate(s"CREATE TABLE $tableName ($strSchema) 
$createTableOptions")
   }
 
+  /**
+   * Returns an Insert SQL statement template for inserting a row into the 
target table via JDBC
+   * conn. Use "?" as placeholder for each value to be inserted.
+   * E.g. `INSERT INTO t ("name", "age", "gender") VALUES (?, ?, ?)`
+   *
+   * @param table The name of the table.
+   * @param fields The fields of the row that will be inserted.
+   * @return The SQL query to use for insert data into table.
+   */
+  @Since("4.0.0")
+  def insertIntoTable(
+      table: String,
+      fields: Array[StructField]): String = {
+    val placeholders = fields.map(_ => "?").mkString(",")
+    val columns = fields.map(x => quoteIdentifier(x.name)).mkString(",")
+    s"INSERT INTO $table ($columns) VALUES ($placeholders)"
+  }
+
   /**
    * Get the SQL query that should be used to find if the given table exists. 
Dialects can
    * override this method to return a query that works best in a particular 
database.
@@ -542,6 +560,17 @@ abstract class JdbcDialect extends Serializable with 
Logging {
     }
   }
 
+  /**
+   * Build a SQL statement to drop the given table.
+   *
+   * @param table the table name
+   * @return The SQL statement to use for drop the table.
+   */
+  @Since("4.0.0")
+  def dropTable(table: String): String = {
+    s"DROP TABLE $table"
+  }
+
   /**
    * Build a create index SQL statement.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to