This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 93af0848e46 [SPARK-44710][CONNECT] Add 
Dataset.dropDuplicatesWithinWatermark to Spark Connect Scala Client
93af0848e46 is described below

commit 93af0848e467fe4d58c0fb1242b738931390d6f8
Author: Herman van Hovell <her...@databricks.com>
AuthorDate: Tue Aug 8 15:05:18 2023 +0200

    [SPARK-44710][CONNECT] Add Dataset.dropDuplicatesWithinWatermark to Spark 
Connect Scala Client
    
    ### What changes were proposed in this pull request?
    This PR adds `Dataset.dropDuplicatesWithinWatermark` to the Spark Connect 
Scala Client.
    
    ### Why are the changes needed?
    Increase compatibility with the current sql/core APIs.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. It adds a new method to the scala client.
    
    ### How was this patch tested?
    Added a new (rudimentary) test to `ClientStreamingQuerySuite`.
    
    Closes #42384 from hvanhovell/SPARK-44710.
    
    Authored-by: Herman van Hovell <her...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  39 +++++++++++----------
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |  20 +++++++++++
 .../resources/query-tests/queries/distinct.json    |   3 +-
 .../query-tests/queries/distinct.proto.bin         | Bin 50 -> 52 bytes
 .../query-tests/queries/dropDuplicates.json        |   3 +-
 .../query-tests/queries/dropDuplicates.proto.bin   | Bin 50 -> 52 bytes
 .../queries/dropDuplicates_names_array.json        |   3 +-
 .../queries/dropDuplicates_names_array.proto.bin   | Bin 55 -> 57 bytes
 .../queries/dropDuplicates_names_seq.json          |   3 +-
 .../queries/dropDuplicates_names_seq.proto.bin     | Bin 54 -> 56 bytes
 .../queries/dropDuplicates_varargs.json            |   3 +-
 .../queries/dropDuplicates_varargs.proto.bin       | Bin 58 -> 60 bytes
 12 files changed, 51 insertions(+), 23 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 8a7dce3987a..5f263903c8b 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2399,6 +2399,19 @@ class Dataset[T] private[sql] (
         .addAllColumnNames(cols.asJava)
   }
 
+  private def buildDropDuplicates(
+      columns: Option[Seq[String]],
+      withinWaterMark: Boolean): Dataset[T] = sparkSession.newDataset(encoder) 
{ builder =>
+    val dropBuilder = builder.getDeduplicateBuilder
+      .setInput(plan.getRoot)
+      .setWithinWatermark(withinWaterMark)
+    if (columns.isDefined) {
+      dropBuilder.addAllColumnNames(columns.get.asJava)
+    } else {
+      dropBuilder.setAllColumnsAsKeys(true)
+    }
+  }
+
   /**
    * Returns a new Dataset that contains only the unique rows from this 
Dataset. This is an alias
    * for `distinct`.
@@ -2406,11 +2419,7 @@ class Dataset[T] private[sql] (
    * @group typedrel
    * @since 3.4.0
    */
-  def dropDuplicates(): Dataset[T] = sparkSession.newDataset(encoder) { 
builder =>
-    builder.getDeduplicateBuilder
-      .setInput(plan.getRoot)
-      .setAllColumnsAsKeys(true)
-  }
+  def dropDuplicates(): Dataset[T] = buildDropDuplicates(None, withinWaterMark 
= false)
 
   /**
    * (Scala-specific) Returns a new Dataset with duplicate rows removed, 
considering only the
@@ -2419,11 +2428,8 @@ class Dataset[T] private[sql] (
    * @group typedrel
    * @since 3.4.0
    */
-  def dropDuplicates(colNames: Seq[String]): Dataset[T] = 
sparkSession.newDataset(encoder) {
-    builder =>
-      builder.getDeduplicateBuilder
-        .setInput(plan.getRoot)
-        .addAllColumnNames(colNames.asJava)
+  def dropDuplicates(colNames: Seq[String]): Dataset[T] = {
+    buildDropDuplicates(Option(colNames), withinWaterMark = false)
   }
 
   /**
@@ -2443,16 +2449,14 @@ class Dataset[T] private[sql] (
    */
   @scala.annotation.varargs
   def dropDuplicates(col1: String, cols: String*): Dataset[T] = {
-    val colNames: Seq[String] = col1 +: cols
-    dropDuplicates(colNames)
+    dropDuplicates(col1 +: cols)
   }
 
-  def dropDuplicatesWithinWatermark(): Dataset[T] = {
-    dropDuplicatesWithinWatermark(this.columns)
-  }
+  def dropDuplicatesWithinWatermark(): Dataset[T] =
+    buildDropDuplicates(None, withinWaterMark = true)
 
   def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = {
-    throw new UnsupportedOperationException("dropDuplicatesWithinWatermark is 
not implemented.")
+    buildDropDuplicates(Option(colNames), withinWaterMark = true)
   }
 
   def dropDuplicatesWithinWatermark(colNames: Array[String]): Dataset[T] = {
@@ -2461,8 +2465,7 @@ class Dataset[T] private[sql] (
 
   @scala.annotation.varargs
   def dropDuplicatesWithinWatermark(col1: String, cols: String*): Dataset[T] = 
{
-    val colNames: Seq[String] = col1 +: cols
-    dropDuplicatesWithinWatermark(colNames)
+    dropDuplicatesWithinWatermark(col1 +: cols)
   }
 
   /**
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index ebd3d037bba..074cf170dd3 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -1183,6 +1183,26 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper with PrivateM
     val joined = ds1.joinWith(ds2, $"a.value._1" === $"b.value._2", "inner")
     checkSameResult(Seq((Some((2, 3)), Some((1, 2)))), joined)
   }
+
+  test("dropDuplicatesWithinWatermark not supported in batch DataFrame") {
+    def testAndVerify(df: Dataset[_]): Unit = {
+      val exc = intercept[AnalysisException] {
+        df.write.format("noop").mode(SaveMode.Append).save()
+      }
+
+      assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not 
supported"))
+      assert(exc.getMessage.contains("batch DataFrames/DataSets"))
+    }
+
+    val result = spark.range(10).dropDuplicatesWithinWatermark()
+    testAndVerify(result)
+
+    val result2 = spark
+      .range(10)
+      .withColumn("newcol", col("id"))
+      .dropDuplicatesWithinWatermark("newcol")
+    testAndVerify(result2)
+  }
 }
 
 private[sql] case class ClassData(a: String, b: Int)
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/distinct.json 
b/connector/connect/common/src/test/resources/query-tests/queries/distinct.json
index ae796b52035..15c320d462b 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/queries/distinct.json
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/distinct.json
@@ -11,6 +11,7 @@
         "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
       }
     },
-    "allColumnsAsKeys": true
+    "allColumnsAsKeys": true,
+    "withinWatermark": false
   }
 }
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin
index 07430c43831..078223b1f3e 100644
Binary files 
a/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin
 and 
b/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin
 differ
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json
index ae796b52035..15c320d462b 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json
@@ -11,6 +11,7 @@
         "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
       }
     },
-    "allColumnsAsKeys": true
+    "allColumnsAsKeys": true,
+    "withinWatermark": false
   }
 }
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin
index 07430c43831..078223b1f3e 100644
Binary files 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin
 and 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin
 differ
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json
index e72e23c86ca..23df6972a51 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json
@@ -11,6 +11,7 @@
         "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
       }
     },
-    "columnNames": ["a", "id"]
+    "columnNames": ["a", "id"],
+    "withinWatermark": false
   }
 }
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin
index c8e3885fbf8..3bdbeb0d386 100644
Binary files 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin
 and 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin
 differ
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json
index 754cecac4b2..6ef72770b9a 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json
@@ -11,6 +11,7 @@
         "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
       }
     },
-    "columnNames": ["a", "b"]
+    "columnNames": ["a", "b"],
+    "withinWatermark": false
   }
 }
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin
index 1a2d635e58e..65b4942c568 100644
Binary files 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin
 and 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin
 differ
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json
index c4a8df30c58..2b6d46a3135 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json
@@ -11,6 +11,7 @@
         "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
       }
     },
-    "columnNames": ["a", "b", "id"]
+    "columnNames": ["a", "b", "id"],
+    "withinWatermark": false
   }
 }
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin
index 719a373c2e3..57f0d7e5afa 100644
Binary files 
a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin
 and 
b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin
 differ


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to