This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new f6b5fd933ed [SPARK-42733][CONNECT][FOLLOWUP] Write without path or 
table
f6b5fd933ed is described below

commit f6b5fd933ed41b09c733ae339118133508dd0870
Author: Zhen Li <zhenli...@users.noreply.github.com>
AuthorDate: Tue Mar 14 18:31:27 2023 +0900

    [SPARK-42733][CONNECT][FOLLOWUP] Write without path or table
    
    Fixes `DataFrameWriter.save` to work without path or table parameter.
    Added support of jdbc method in the writer as it is one of the impl that 
does not contains a path or table.
    
    DataFrameWriter.save should work without path parameter because some data 
sources, such as jdbc, noop, works without those parameters.
    The follow up fix for scala client of 
https://github.com/apache/spark/pull/40356
    
    No
    
    Unit and E2E test
    
    Closes #40358 from zhenlineo/write-without-path-table.
    
    Authored-by: Zhen Li <zhenli...@users.noreply.github.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 93334e295483a0ba66e22d8398512ad970a3ea80)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../org/apache/spark/sql/DataFrameWriter.scala     | 37 ++++++++++++++++++++--
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  | 21 ++++++++++++
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 28 ++++++++++++++++
 .../CheckConnectJvmClientCompatibility.scala       |  4 +--
 .../connect/planner/SparkConnectProtoSuite.scala   | 11 ++++---
 5 files changed, 92 insertions(+), 9 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 8434addec92..b9d1fefb105 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import java.util.Locale
+import java.util.{Locale, Properties}
 
 import scala.collection.JavaConverters._
 
@@ -228,7 +228,9 @@ final class DataFrameWriter[T] private[sql] (ds: 
Dataset[T]) {
 
     // Set path or table
     f(builder)
-    require(builder.hasPath != builder.hasTable) // Only one can be set
+
+    // Cannot both be set
+    require(!(builder.hasPath && builder.hasTable))
 
     builder.setMode(mode match {
       case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
@@ -345,6 +347,37 @@ final class DataFrameWriter[T] private[sql] (ds: 
Dataset[T]) {
     })
   }
 
+  /**
+   * Saves the content of the `DataFrame` to an external database table via 
JDBC. In the case the
+   * table already exists in the external database, behavior of this function 
depends on the save
+   * mode, specified by the `mode` function (default to throwing an exception).
+   *
+   * Don't create too many partitions in parallel on a large cluster; 
otherwise Spark might crash
+   * your external database systems.
+   *
+   * JDBC-specific option and parameter documentation for storing tables via 
JDBC in <a
+   * 
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option";>
+   * Data Source Option</a> in the version you use.
+   *
+   * @param table
+   *   Name of the table in the external database.
+   * @param connectionProperties
+   *   JDBC database connection arguments, a list of arbitrary string 
tag/value. Normally at least
+   *   a "user" and "password" property should be included. "batchsize" can be 
used to control the
+   *   number of rows per insert. "isolationLevel" can be one of "NONE", 
"READ_COMMITTED",
+   *   "READ_UNCOMMITTED", "REPEATABLE_READ", or "SERIALIZABLE", corresponding 
to standard
+   *   transaction isolation levels defined by JDBC's Connection object, with 
default of
+   *   "READ_UNCOMMITTED".
+   * @since 3.4.0
+   */
+  def jdbc(url: String, table: String, connectionProperties: Properties): Unit 
= {
+    // connectionProperties should override settings in extraOptions.
+    this.extraOptions ++= connectionProperties.asScala
+    // explicit url and dbtable should override all
+    this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
+    format("jdbc").save()
+  }
+
   /**
    * Saves the content of the `DataFrame` in JSON format (<a 
href="http://jsonlines.org/";> JSON
    * Lines text format or newline-delimited JSON</a>) at the specified path. 
This is equivalent
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 60bb23516b0..5aa5500116d 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -22,6 +22,7 @@ import java.nio.file.Files
 import scala.collection.JavaConverters._
 
 import io.grpc.StatusRuntimeException
+import java.util.Properties
 import org.apache.commons.io.FileUtils
 import org.apache.commons.io.output.TeeOutputStream
 import org.scalactic.TolerantNumerics
@@ -175,6 +176,26 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper {
     }
   }
 
+  test("write without table or path") {
+    // Should receive no error to write noop
+    spark.range(10).write.format("noop").mode("append").save()
+  }
+
+  test("write jdbc") {
+    val url = "jdbc:derby:memory:1234"
+    val table = "t1"
+    try {
+      spark.range(10).write.jdbc(url = s"$url;create=true", table, new 
Properties())
+      val result = spark.read.jdbc(url = url, table, new 
Properties()).collect()
+      assert(result.length == 10)
+    } finally {
+      // clean up
+      assertThrows[StatusRuntimeException] {
+        spark.read.jdbc(url = s"$url;drop=true", table, new 
Properties()).collect()
+      }
+    }
+  }
+
   test("writeTo with create and using") {
     // TODO (SPARK-42519): Add more test after we can set configs. See more 
WriteTo test cases
     //  in SparkConnectProtoSuite.
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 42376db880b..e5738fe7acd 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import io.grpc.Server
 import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import java.util.Properties
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.connect.proto
@@ -100,6 +101,33 @@ class DatasetSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
     assert(actualPlan.equals(expectedPlan))
   }
 
+  test("write jdbc") {
+    val df = ss.newDataFrame(_ => ()).limit(10)
+
+    val builder = proto.WriteOperation.newBuilder()
+    builder
+      .setInput(df.plan.getRoot)
+      .setMode(proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS)
+      .setSource("jdbc")
+      .putOptions("a", "b")
+      .putOptions("1", "2")
+      .putOptions("url", "url")
+      .putOptions("dbtable", "table")
+
+    val expectedPlan = proto.Plan
+      .newBuilder()
+      .setCommand(proto.Command.newBuilder().setWriteOperation(builder))
+      .build()
+
+    val connectionProperties = new Properties
+    connectionProperties.put("a", "b")
+    connectionProperties.put("1", "2")
+    df.write.jdbc("url", "table", connectionProperties)
+
+    val actualPlan = service.getAndClearLatestInputPlan()
+    assert(actualPlan.equals(expectedPlan))
+  }
+
   test("write V2") {
     val df = ss.newDataFrame(_ => ()).limit(10)
 
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index ae6c6c86fec..97d130421a2 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -130,9 +130,7 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
 
       // DataFrame Reader & Writer
-      
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"),
-      
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.jdbc"),
-      
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameWriter.jdbc"),
+      
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameReader.json"), 
// deprecated
 
       // DataFrameNaFunctions
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.DataFrameNaFunctions.this"),
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index 9cc714d630b..824ee7aceb4 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
 
 import com.google.protobuf.ByteString
 
-import org.apache.spark.SparkClassNotFoundException
+import org.apache.spark.{SparkClassNotFoundException, 
SparkIllegalArgumentException}
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.Expression
 import org.apache.spark.connect.proto.Join.JoinType
@@ -554,13 +554,16 @@ class SparkConnectProtoSuite extends PlanTest with 
SparkConnectPlanTest {
       parameters = Map("columnName" -> "`duplicatedcol`"))
   }
 
-  // TODO(SPARK-42733): Writes without path or table should work.
-  ignore("Writes fails without path or table") {
-    assertThrows[UnsupportedOperationException] {
+  test("Writes fails without path or table") {
+    assertThrows[SparkIllegalArgumentException] {
       transform(localRelation.write())
     }
   }
 
+  test("Writes without path or table") {
+    transform(localRelation.write(format = Some("noop"), mode = 
Some("Append")))
+  }
+
   test("Write fails with unknown table - AnalysisException") {
     val cmd = readRel.write(tableName = Some("dest"))
     assertThrows[AnalysisException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to