rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1159091700


##########
python/pyspark/sql/tests/connect/test_parity_dataframe.py:
##########
@@ -41,6 +41,11 @@ def test_observe(self):
     def test_observe_str(self):
         super().test_observe_str()
 
+    # TODO(SPARK-XXXXX): Support Structured Streaming

Review Comment:
   This test does not use streaming.



##########
python/pyspark/sql/dataframe.py:
##########
@@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
             jdf = self._jdf.dropDuplicates(self._jseq(subset))
         return DataFrame(jdf, self.sparkSession)
 
+    def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
+        """Return a new :class:`DataFrame` with duplicate rows removed,
+         optionally only considering certain columns, within watermark.
+
+        For a static batch :class:`DataFrame`, it just drops duplicate rows. 
For a streaming

Review Comment:
   Remove 'just'



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3025,107 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, within watermark.
+   *
+   * For a static batch [[Dataset]], it just drops duplicate rows. For a 
streaming [[Dataset]],
+   * this will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the semantic, "Events are deduplicated as 
long as the time
+   * distance of earliest and latest events are smaller than the delay 
threshold of watermark."
+   * The watermark for the input [[Dataset]] must be set via 
[[withWatermark]]. Users are
+   * encouraged to set the delay threshold of watermark longer than max 
timestamp differences
+   * among duplicated events. In addition, too late data older than watermark 
will be dropped.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the 
subset of columns,
+   * within watermark.
+   *
+   * For a static batch [[Dataset]], it just drops duplicate rows. For a 
streaming [[Dataset]],
+   * this will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the semantic, "Events are deduplicated as 
long as the time
+   * distance of earliest and latest events are smaller than the delay 
threshold of watermark."
+   * The watermark for the input [[Dataset]] must be set via 
[[withWatermark]]. Users are
+   * encouraged to set the delay threshold of watermark longer than max 
timestamp differences
+   * among duplicated events. In addition, too late data older than watermark 
will be dropped.

Review Comment:
   > In addition, too late data older than watermark will be dropped.
   
   Why is it required here? Simpler to remove? 



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -2132,6 +2132,61 @@ streamingDf <- withWatermark(streamingDf, "eventTime", 
"10 seconds")
 streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
 {% endhighlight %}
 
+</div>
+
+</div>
+
+Specifically for streaming, you can deduplicate records in data streams using 
a unique identifier in the events, within the time range of watermark.
+For example, if you set the delay threshold of watermark as "1 hour", 
duplicated events which occurred within 1 hour can be correctly deduplicated.
+(For more details, please refer to the API doc of 
[dropDuplicatesWithinWatermark](/api/scala/org/apache/spark/sql/Dataset.html#dropDuplicatesWithinWatermark():org.apache.spark.sql.Dataset[T]).)
+
+This can be used to deal with use case where event time column cannot be a 
part of unique identifier, mostly due to the case
+where event times are somehow different for the same records. (E.g. 
non-idempotent writer where issuing event time happens at write)
+
+Users are encouraged to set the delay threshold of watermark longer than max 
timestamp differences among duplicated events.
+
+This feature requires watermark with delay threshold to be set in streaming 
DataFrame/Dataset.
+
+<div class="codetabs">
+
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+streamingDf = spark.readStream. ...
+
+# deduplicate using guid column with watermark based on eventTime column
+streamingDf \
+  .withWatermark("eventTime", "10 seconds") \
+  .dropDuplicatesWithinWatermark("guid")
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...
+
+// deduplicate using guid column with watermark based on eventTime column
+streamingDf
+  .withWatermark("eventTime", "10 seconds")

Review Comment:
   How about "1 hour" or "1 day"? 10 seconds seems very low for most use cases 
of this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to