[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-04-07 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1160800555


##
python/pyspark/sql/dataframe.py:
##
@@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
 jdf = self._jdf.dropDuplicates(self._jseq(subset))
 return DataFrame(jdf, self.sparkSession)
 
+def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
+"""Return a new :class:`DataFrame` with duplicate rows removed,
+ optionally only considering certain columns, within watermark.
+
+For a static batch :class:`DataFrame`, it just drops duplicate rows. 
For a streaming
+:class:`DataFrame`, this will keep all data across triggers as 
intermediate state to drop
+duplicated rows. The state will be kept to guarantee the semantic, 
"Events are deduplicated
+as long as the time distance of earliest and latest events are smaller 
than the delay
+threshold of watermark." The watermark for the input 
:class:`DataFrame` must be set via
+:func:`withWatermark`. Users are encouraged to set the delay threshold 
of watermark longer

Review Comment:
   dropDuplicates does not support exact same output between batch and 
streaming either. No stateful operation guarantees in the precense of late 
records. What is the difference here? Better to support batch in the same 
manner as dropDuplicates(). 
   I don't think it is a good UX for customer to get errors then we fix it by 
relaxing. 
   But I will leave the decision to you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-04-06 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1160032216


##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -3038,6 +3025,107 @@ class Dataset[T] private[sql](
 dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, within watermark.
+   *
+   * For a static batch [[Dataset]], it just drops duplicate rows. For a 
streaming [[Dataset]],
+   * this will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the semantic, "Events are deduplicated as 
long as the time
+   * distance of earliest and latest events are smaller than the delay 
threshold of watermark."
+   * The watermark for the input [[Dataset]] must be set via 
[[withWatermark]]. Users are
+   * encouraged to set the delay threshold of watermark longer than max 
timestamp differences
+   * among duplicated events. In addition, too late data older than watermark 
will be dropped.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the 
subset of columns,
+   * within watermark.
+   *
+   * For a static batch [[Dataset]], it just drops duplicate rows. For a 
streaming [[Dataset]],
+   * this will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the semantic, "Events are deduplicated as 
long as the time
+   * distance of earliest and latest events are smaller than the delay 
threshold of watermark."
+   * The watermark for the input [[Dataset]] must be set via 
[[withWatermark]]. Users are
+   * encouraged to set the delay threshold of watermark longer than max 
timestamp differences
+   * among duplicated events. In addition, too late data older than watermark 
will be dropped.

Review Comment:
   It is statement on how watermark works, not related to dropDuplicatate(). R
   May be better to say 'Note that ...' rather than 'In addition ...' 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-04-06 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1160030393


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
   d.withNewChildren(Seq(simplifyUnion(u)))
 case d @ Deduplicate(_, u: Union) =>
   d.withNewChildren(Seq(simplifyUnion(u)))
+case d @ DeduplicateWithinWatermark(_, u: Union) =>

Review Comment:
   Yeah, went with author's preferred two node option. @HeartSaVioR reused a 
lot of the code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-04-06 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1160030393


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
   d.withNewChildren(Seq(simplifyUnion(u)))
 case d @ Deduplicate(_, u: Union) =>
   d.withNewChildren(Seq(simplifyUnion(u)))
+case d @ DeduplicateWithinWatermark(_, u: Union) =>

Review Comment:
   Yeah, went with the two node option. @HeartSaVioR reused a lot of the code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-04-06 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1160027853


##
python/pyspark/sql/dataframe.py:
##
@@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
 jdf = self._jdf.dropDuplicates(self._jseq(subset))
 return DataFrame(jdf, self.sparkSession)
 
+def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
+"""Return a new :class:`DataFrame` with duplicate rows removed,
+ optionally only considering certain columns, within watermark.
+
+For a static batch :class:`DataFrame`, it just drops duplicate rows. 
For a streaming
+:class:`DataFrame`, this will keep all data across triggers as 
intermediate state to drop
+duplicated rows. The state will be kept to guarantee the semantic, 
"Events are deduplicated
+as long as the time distance of earliest and latest events are smaller 
than the delay
+threshold of watermark." The watermark for the input 
:class:`DataFrame` must be set via
+:func:`withWatermark`. Users are encouraged to set the delay threshold 
of watermark longer
+than max timestamp differences among duplicated events. In addition, 
too late data older
+than watermark will be dropped.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ --
+ subset : List of column names, optional
+ List of columns to use for duplicate comparison (default All 
columns).
+
+ Returns
+ ---
+ :class:`DataFrame`
+ DataFrame without duplicates.
+
+ Examples
+ 
+ >>> from pyspark.sql import Row
+ >>> df = spark.createDataFrame([
+ ... Row(name='Alice', age=5, height=80),
+ ... Row(name='Alice', age=5, height=80),
+ ... Row(name='Alice', age=10, height=80)
+ ... ])
+
+ Deduplicate the same rows.
+
+ >>> df.dropDuplicatesWithinWatermark().show()

Review Comment:
   Also, even if it is a batch example, better to set watermark. It will serve 
as good example for users, since most of them will be looking to use in 
streaming context. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-04-06 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1160029232


##
python/pyspark/sql/dataframe.py:
##
@@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
 jdf = self._jdf.dropDuplicates(self._jseq(subset))
 return DataFrame(jdf, self.sparkSession)
 
+def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
+"""Return a new :class:`DataFrame` with duplicate rows removed,
+ optionally only considering certain columns, within watermark.
+
+For a static batch :class:`DataFrame`, it just drops duplicate rows. 
For a streaming
+:class:`DataFrame`, this will keep all data across triggers as 
intermediate state to drop
+duplicated rows. The state will be kept to guarantee the semantic, 
"Events are deduplicated
+as long as the time distance of earliest and latest events are smaller 
than the delay
+threshold of watermark." The watermark for the input 
:class:`DataFrame` must be set via
+:func:`withWatermark`. Users are encouraged to set the delay threshold 
of watermark longer
+than max timestamp differences among duplicated events. In addition, 
too late data older
+than watermark will be dropped.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ --
+ subset : List of column names, optional
+ List of columns to use for duplicate comparison (default All 
columns).
+
+ Returns
+ ---
+ :class:`DataFrame`
+ DataFrame without duplicates.
+
+ Examples
+ 
+ >>> from pyspark.sql import Row
+ >>> df = spark.createDataFrame([
+ ... Row(name='Alice', age=5, height=80),
+ ... Row(name='Alice', age=5, height=80),
+ ... Row(name='Alice', age=10, height=80)
+ ... ])
+
+ Deduplicate the same rows.
+
+ >>> df.dropDuplicatesWithinWatermark().show()
+ +-+---+--+
+ | name|age|height|
+ +-+---+--+
+ |Alice|  5|80|
+ |Alice| 10|80|
+ +-+---+--+
+
+ Deduplicate values on 'name' and 'height' columns.
+
+ >>> df.dropDuplicatesWithinWatermark(['name', 'height']).show()

Review Comment:
   +1. Lets add watermark() even if this is a batch example. better still is to 
make it streaming example (use rate source, and we relax the test assert)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-04-06 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1160025207


##
python/pyspark/sql/dataframe.py:
##
@@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
 jdf = self._jdf.dropDuplicates(self._jseq(subset))
 return DataFrame(jdf, self.sparkSession)
 
+def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
+"""Return a new :class:`DataFrame` with duplicate rows removed,
+ optionally only considering certain columns, within watermark.
+
+For a static batch :class:`DataFrame`, it just drops duplicate rows. 
For a streaming
+:class:`DataFrame`, this will keep all data across triggers as 
intermediate state to drop
+duplicated rows. The state will be kept to guarantee the semantic, 
"Events are deduplicated
+as long as the time distance of earliest and latest events are smaller 
than the delay
+threshold of watermark." The watermark for the input 
:class:`DataFrame` must be set via
+:func:`withWatermark`. Users are encouraged to set the delay threshold 
of watermark longer

Review Comment:
   [Updated my comment]: I think batch should not require watermark. But all 
our examples including batch examples should set watermark to emphasis its 
importance. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-04-06 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1160025207


##
python/pyspark/sql/dataframe.py:
##
@@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
 jdf = self._jdf.dropDuplicates(self._jseq(subset))
 return DataFrame(jdf, self.sparkSession)
 
+def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
+"""Return a new :class:`DataFrame` with duplicate rows removed,
+ optionally only considering certain columns, within watermark.
+
+For a static batch :class:`DataFrame`, it just drops duplicate rows. 
For a streaming
+:class:`DataFrame`, this will keep all data across triggers as 
intermediate state to drop
+duplicated rows. The state will be kept to guarantee the semantic, 
"Events are deduplicated
+as long as the time distance of earliest and latest events are smaller 
than the delay
+threshold of watermark." The watermark for the input 
:class:`DataFrame` must be set via
+:func:`withWatermark`. Users are encouraged to set the delay threshold 
of watermark longer

Review Comment:
   I think it better to include watermark since this serves as important 
documentation for the users. Most of the time, the user is looking for an 
example to use in streaming. Not including watermark here is going to be 
confusing for them. 
   The fact that it is ignored in batch is fine. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-04-05 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1159091700


##
python/pyspark/sql/tests/connect/test_parity_dataframe.py:
##
@@ -41,6 +41,11 @@ def test_observe(self):
 def test_observe_str(self):
 super().test_observe_str()
 
+# TODO(SPARK-X): Support Structured Streaming

Review Comment:
   This test does not use streaming.



##
python/pyspark/sql/dataframe.py:
##
@@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
 jdf = self._jdf.dropDuplicates(self._jseq(subset))
 return DataFrame(jdf, self.sparkSession)
 
+def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
+"""Return a new :class:`DataFrame` with duplicate rows removed,
+ optionally only considering certain columns, within watermark.
+
+For a static batch :class:`DataFrame`, it just drops duplicate rows. 
For a streaming

Review Comment:
   Remove 'just'



##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -3038,6 +3025,107 @@ class Dataset[T] private[sql](
 dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, within watermark.
+   *
+   * For a static batch [[Dataset]], it just drops duplicate rows. For a 
streaming [[Dataset]],
+   * this will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the semantic, "Events are deduplicated as 
long as the time
+   * distance of earliest and latest events are smaller than the delay 
threshold of watermark."
+   * The watermark for the input [[Dataset]] must be set via 
[[withWatermark]]. Users are
+   * encouraged to set the delay threshold of watermark longer than max 
timestamp differences
+   * among duplicated events. In addition, too late data older than watermark 
will be dropped.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the 
subset of columns,
+   * within watermark.
+   *
+   * For a static batch [[Dataset]], it just drops duplicate rows. For a 
streaming [[Dataset]],
+   * this will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the semantic, "Events are deduplicated as 
long as the time
+   * distance of earliest and latest events are smaller than the delay 
threshold of watermark."
+   * The watermark for the input [[Dataset]] must be set via 
[[withWatermark]]. Users are
+   * encouraged to set the delay threshold of watermark longer than max 
timestamp differences
+   * among duplicated events. In addition, too late data older than watermark 
will be dropped.

Review Comment:
   > In addition, too late data older than watermark will be dropped.
   
   Why is it required here? Simpler to remove? 



##
docs/structured-streaming-programming-guide.md:
##
@@ -2132,6 +2132,61 @@ streamingDf <- withWatermark(streamingDf, "eventTime", 
"10 seconds")
 streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
 {% endhighlight %}
 
+
+
+
+
+Specifically for streaming, you can deduplicate records in data streams using 
a unique identifier in the events, within the time range of watermark.
+For example, if you set the delay threshold of watermark as "1 hour", 
duplicated events which occurred within 1 hour can be correctly deduplicated.
+(For more details, please refer to the API doc of 
[dropDuplicatesWithinWatermark](/api/scala/org/apache/spark/sql/Dataset.html#dropDuplicatesWithinWatermark():org.apache.spark.sql.Dataset[T]).)
+
+This can be used to deal with use case where event time column cannot be a 
part of unique identifier, mostly due to the case
+where event times are somehow different for the same records. (E.g. 
non-idempotent writer where issuing event time happens at write)
+
+Users are encouraged to set the delay threshold of watermark longer than max 
timestamp differences among duplicated events.
+
+This feature requires watermark with delay threshold to be set in streaming 
DataFrame/Dataset.
+
+
+
+
+
+{% highlight python %}
+streamingDf = spark.readStream. ...
+
+# deduplicate using guid column with watermark based on eventTime column
+streamingDf \
+  .withWatermark("eventTime", "10 seconds") \
+  .dropDuplicatesWithinWatermark("guid")
+{% endhighlight %}
+
+
+
+
+
+{% highlight scala %}
+val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...
+
+// deduplicate using guid column with watermark based on eventTime column
+streamingDf
+  .withWatermark("eventTime", "10 seconds")

Review Comment:
   How about "1 hour" or "1 day"? 10 seconds seems very low for most use cases 
of this. 

[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-31 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1154705202


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##
@@ -980,3 +1022,65 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
 
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateInfo: Option[StatefulOperatorStateInfo] = None,
+eventTimeWatermarkForLateEvents: Option[Long] = None,
+eventTimeWatermarkForEviction: Option[Long] = None)
+  extends BaseStreamingDeduplicateExec {
+
+  protected val schemaForValueRow: StructType = StructType(
+Array(StructField("expiresAt", LongType, nullable = false)))

Review Comment:
   If we are saving long, I would suggesting changing the name to `expiresAtMs` 
or `expiresAtMicros` depending on it unit. 



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##
@@ -980,3 +1022,65 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
 
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateInfo: Option[StatefulOperatorStateInfo] = None,
+eventTimeWatermarkForLateEvents: Option[Long] = None,
+eventTimeWatermarkForEviction: Option[Long] = None)
+  extends BaseStreamingDeduplicateExec {
+
+  protected val schemaForValueRow: StructType = StructType(
+Array(StructField("expiresAt", LongType, nullable = false)))

Review Comment:
   If we are saving long, I would suggest changing the name to `expiresAtMs` or 
`expiresAtMicros` depending on it unit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-31 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1154700264


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##
@@ -980,3 +1022,65 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
 
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateInfo: Option[StatefulOperatorStateInfo] = None,
+eventTimeWatermarkForLateEvents: Option[Long] = None,
+eventTimeWatermarkForEviction: Option[Long] = None)
+  extends BaseStreamingDeduplicateExec {
+
+  protected val schemaForValueRow: StructType = StructType(
+Array(StructField("expiresAt", LongType, nullable = false)))

Review Comment:
   So the type of value in the state will be Long? SGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-30 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1153915858


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{AnalysisException, Dataset, SaveMode}
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions.timestamp_seconds
+
+class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest 
{
+
+  import testImplicits._
+
+  test("deduplicate without event time column") {
+def testAndVerify(df: Dataset[_]): Unit = {
+  val exc = intercept[AnalysisException] {
+df.writeStream.format("noop").start()
+  }
+
+  assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not 
supported"))

Review Comment:
   Optional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-30 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1153915713


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{AnalysisException, Dataset, SaveMode}
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions.timestamp_seconds
+
+class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest 
{
+
+  import testImplicits._
+
+  test("deduplicate without event time column") {
+def testAndVerify(df: Dataset[_]): Unit = {
+  val exc = intercept[AnalysisException] {
+df.writeStream.format("noop").start()
+  }
+
+  assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not 
supported"))
+  assert(exc.getMessage.contains("streaming DataFrames/DataSets without 
watermark"))
+}
+
+val inputData = MemoryStream[String]
+val result = inputData.toDS().dropDuplicatesWithinWatermark()
+testAndVerify(result)
+
+val result2 = inputData.toDS().withColumn("newcol", $"value")
+  .dropDuplicatesWithinWatermark("newcol")
+testAndVerify(result2)
+
+val inputData2 = MemoryStream[(String, Int)]
+val otherSideForJoin = inputData2.toDF()
+  .select($"_1" as "key", timestamp_seconds($"_2") as "time")
+  .withWatermark("Time", "10 seconds")
+
+val result3 = inputData.toDS()
+  .select($"value".as("key"))
+  // there are two streams which one stream only defines the watermark. 
the stream which
+  // contains dropDuplicatesWithinWatermark does not define the watermark, 
which is not
+  // supported.
+  .dropDuplicatesWithinWatermark()
+  .join(otherSideForJoin, "key")
+testAndVerify(result3)
+  }
+
+  test("deduplicate with all columns with event time column") {
+val inputData = MemoryStream[Int]
+val result = inputData.toDS()
+  .withColumn("eventTime", timestamp_seconds($"value"))
+  .withWatermark("eventTime", "10 seconds")
+  .dropDuplicatesWithinWatermark()
+  .select($"eventTime".cast("long").as[Long])
+
+testStream(result, Append)(
+  // Advance watermark to 5 secs, no-data-batch does not drop state rows
+  AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+  CheckAnswer(10 to 15: _*),
+  assertNumStateRows(total = 6, updated = 6),
+
+  // Advance watermark to 7 secs, no-data-batch does not drop state rows
+  AddData(inputData, (13 to 17): _*),
+  // 13 to 15 are duplicated
+  CheckNewAnswer(16, 17),
+  assertNumStateRows(total = 8, updated = 2),
+
+  AddData(inputData, 5), // Should not emit anything as data less than 
watermark
+  CheckNewAnswer(),
+  assertNumStateRows(total = 8, updated = 0, droppedByWatermark = 1),
+
+  // Advance watermark to 25 secs, no-data-batch drops state rows having 
expired time <= 25
+  AddData(inputData, 35),
+  CheckNewAnswer(35),
+  assertNumStateRows(total = 3, updated = 1),
+
+  // Advance watermark to 45 seconds, no-data-batch drops state rows 
having expired time <= 45
+  AddData(inputData, 55),
+  CheckNewAnswer(55),
+  assertNumStateRows(total = 1, updated = 1)
+)
+  }
+
+  test("deduplicate with some columns with event time column") {

Review Comment:
   Isn't this testing the case where "eventTime" column is not one of the 
dedup-columns? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-30 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1153915130


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{AnalysisException, Dataset, SaveMode}
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions.timestamp_seconds
+
+class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest 
{
+
+  import testImplicits._
+
+  test("deduplicate without event time column") {
+def testAndVerify(df: Dataset[_]): Unit = {
+  val exc = intercept[AnalysisException] {
+df.writeStream.format("noop").start()
+  }
+
+  assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not 
supported"))
+  assert(exc.getMessage.contains("streaming DataFrames/DataSets without 
watermark"))
+}
+
+val inputData = MemoryStream[String]
+val result = inputData.toDS().dropDuplicatesWithinWatermark()
+testAndVerify(result)
+
+val result2 = inputData.toDS().withColumn("newcol", $"value")
+  .dropDuplicatesWithinWatermark("newcol")
+testAndVerify(result2)
+
+val inputData2 = MemoryStream[(String, Int)]
+val otherSideForJoin = inputData2.toDF()
+  .select($"_1" as "key", timestamp_seconds($"_2") as "time")
+  .withWatermark("Time", "10 seconds")
+
+val result3 = inputData.toDS()
+  .select($"value".as("key"))
+  // there are two streams which one stream only defines the watermark. 
the stream which
+  // contains dropDuplicatesWithinWatermark does not define the watermark, 
which is not
+  // supported.
+  .dropDuplicatesWithinWatermark()
+  .join(otherSideForJoin, "key")
+testAndVerify(result3)
+  }
+
+  test("deduplicate with all columns with event time column") {
+val inputData = MemoryStream[Int]
+val result = inputData.toDS()
+  .withColumn("eventTime", timestamp_seconds($"value"))
+  .withWatermark("eventTime", "10 seconds")
+  .dropDuplicatesWithinWatermark()
+  .select($"eventTime".cast("long").as[Long])
+
+testStream(result, Append)(
+  // Advance watermark to 5 secs, no-data-batch does not drop state rows
+  AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+  CheckAnswer(10 to 15: _*),
+  assertNumStateRows(total = 6, updated = 6),
+
+  // Advance watermark to 7 secs, no-data-batch does not drop state rows
+  AddData(inputData, (13 to 17): _*),
+  // 13 to 15 are duplicated
+  CheckNewAnswer(16, 17),
+  assertNumStateRows(total = 8, updated = 2),
+
+  AddData(inputData, 5), // Should not emit anything as data less than 
watermark
+  CheckNewAnswer(),
+  assertNumStateRows(total = 8, updated = 0, droppedByWatermark = 1),
+
+  // Advance watermark to 25 secs, no-data-batch drops state rows having 
expired time <= 25
+  AddData(inputData, 35),
+  CheckNewAnswer(35),
+  assertNumStateRows(total = 3, updated = 1),
+
+  // Advance watermark to 45 seconds, no-data-batch drops state rows 
having expired time <= 45
+  AddData(inputData, 55),
+  CheckNewAnswer(55),
+  assertNumStateRows(total = 1, updated = 1)
+)
+  }
+
+  test("deduplicate with some columns with event time column") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS()
+  .withColumn("eventTime", timestamp_seconds($"_2"))
+  .withWatermark("eventTime", "2 seconds")
+  .dropDuplicatesWithinWatermark("_1")
+  .select($"_1", $"eventTime".cast("long").as[Long])
+
+testStream(result, Append)(
+  // Advances watermark to 15
+  AddData(inputData, "a" -> 17),
+  CheckNewAnswer("a" -> 17),
+  // expired time is set to 19
+  assertNumStateRows(total = 1, updated = 1),
+
+  // Watermark does not advance
+  AddData(inputData, "a" -> 16),
+  CheckNewAnswer(),
+  assertNumStateRows(total = 1, updated = 0),
+
+  // 

[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-30 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1153914621


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{AnalysisException, Dataset, SaveMode}
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions.timestamp_seconds
+
+class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest 
{
+
+  import testImplicits._
+
+  test("deduplicate without event time column") {
+def testAndVerify(df: Dataset[_]): Unit = {
+  val exc = intercept[AnalysisException] {
+df.writeStream.format("noop").start()
+  }
+
+  assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not 
supported"))
+  assert(exc.getMessage.contains("streaming DataFrames/DataSets without 
watermark"))
+}
+
+val inputData = MemoryStream[String]
+val result = inputData.toDS().dropDuplicatesWithinWatermark()
+testAndVerify(result)
+
+val result2 = inputData.toDS().withColumn("newcol", $"value")
+  .dropDuplicatesWithinWatermark("newcol")
+testAndVerify(result2)
+
+val inputData2 = MemoryStream[(String, Int)]
+val otherSideForJoin = inputData2.toDF()
+  .select($"_1" as "key", timestamp_seconds($"_2") as "time")
+  .withWatermark("Time", "10 seconds")
+
+val result3 = inputData.toDS()
+  .select($"value".as("key"))
+  // there are two streams which one stream only defines the watermark. 
the stream which
+  // contains dropDuplicatesWithinWatermark does not define the watermark, 
which is not
+  // supported.
+  .dropDuplicatesWithinWatermark()
+  .join(otherSideForJoin, "key")
+testAndVerify(result3)
+  }
+
+  test("deduplicate with all columns with event time column") {
+val inputData = MemoryStream[Int]
+val result = inputData.toDS()
+  .withColumn("eventTime", timestamp_seconds($"value"))
+  .withWatermark("eventTime", "10 seconds")
+  .dropDuplicatesWithinWatermark()
+  .select($"eventTime".cast("long").as[Long])
+
+testStream(result, Append)(
+  // Advance watermark to 5 secs, no-data-batch does not drop state rows
+  AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+  CheckAnswer(10 to 15: _*),
+  assertNumStateRows(total = 6, updated = 6),
+
+  // Advance watermark to 7 secs, no-data-batch does not drop state rows
+  AddData(inputData, (13 to 17): _*),
+  // 13 to 15 are duplicated
+  CheckNewAnswer(16, 17),
+  assertNumStateRows(total = 8, updated = 2),
+
+  AddData(inputData, 5), // Should not emit anything as data less than 
watermark
+  CheckNewAnswer(),
+  assertNumStateRows(total = 8, updated = 0, droppedByWatermark = 1),
+
+  // Advance watermark to 25 secs, no-data-batch drops state rows having 
expired time <= 25
+  AddData(inputData, 35),
+  CheckNewAnswer(35),
+  assertNumStateRows(total = 3, updated = 1),
+
+  // Advance watermark to 45 seconds, no-data-batch drops state rows 
having expired time <= 45
+  AddData(inputData, 55),
+  CheckNewAnswer(55),
+  assertNumStateRows(total = 1, updated = 1)
+)
+  }
+
+  test("deduplicate with some columns with event time column") {
+val inputData = MemoryStream[(String, Int)]
+val result = inputData.toDS()
+  .withColumn("eventTime", timestamp_seconds($"_2"))
+  .withWatermark("eventTime", "2 seconds")
+  .dropDuplicatesWithinWatermark("_1")
+  .select($"_1", $"eventTime".cast("long").as[Long])
+
+testStream(result, Append)(
+  // Advances watermark to 15
+  AddData(inputData, "a" -> 17),
+  CheckNewAnswer("a" -> 17),
+  // expired time is set to 19
+  assertNumStateRows(total = 1, updated = 1),
+
+  // Watermark does not advance
+  AddData(inputData, "a" -> 16),
+  CheckNewAnswer(),
+  assertNumStateRows(total = 1, updated = 0),
+
+  // 

[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-30 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1153675952


##
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala:
##
@@ -1742,6 +1742,8 @@ class DataFrameSuite extends QueryTest
   Seq(Row(2, 1, 2), Row(1, 2, 1), Row(1, 1, 1), Row(2, 2, 2)))
   }
 
+  // FIXME: add dropDuplicatesWithinWatermark

Review Comment:
   Any reason to post pone it? 



##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{AnalysisException, Dataset, SaveMode}
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions.timestamp_seconds
+
+class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest 
{
+
+  import testImplicits._
+
+  test("deduplicate without event time column") {
+def testAndVerify(df: Dataset[_]): Unit = {
+  val exc = intercept[AnalysisException] {
+df.writeStream.format("noop").start()
+  }
+
+  assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not 
supported"))
+  assert(exc.getMessage.contains("streaming DataFrames/DataSets without 
watermark"))
+}
+
+val inputData = MemoryStream[String]
+val result = inputData.toDS().dropDuplicatesWithinWatermark()
+testAndVerify(result)
+
+val result2 = inputData.toDS().withColumn("newcol", $"value")
+  .dropDuplicatesWithinWatermark("newcol")
+testAndVerify(result2)
+
+val inputData2 = MemoryStream[(String, Int)]
+val otherSideForJoin = inputData2.toDF()
+  .select($"_1" as "key", timestamp_seconds($"_2") as "time")
+  .withWatermark("Time", "10 seconds")
+
+val result3 = inputData.toDS()
+  .select($"value".as("key"))
+  // there are two streams which one stream only defines the watermark. 
the stream which
+  // contains dropDuplicatesWithinWatermark does not define the watermark, 
which is not
+  // supported.
+  .dropDuplicatesWithinWatermark()
+  .join(otherSideForJoin, "key")
+testAndVerify(result3)
+  }
+
+  test("deduplicate with all columns with event time column") {
+val inputData = MemoryStream[Int]
+val result = inputData.toDS()
+  .withColumn("eventTime", timestamp_seconds($"value"))
+  .withWatermark("eventTime", "10 seconds")
+  .dropDuplicatesWithinWatermark()
+  .select($"eventTime".cast("long").as[Long])
+
+testStream(result, Append)(
+  // Advance watermark to 5 secs, no-data-batch does not drop state rows
+  AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+  CheckAnswer(10 to 15: _*),
+  assertNumStateRows(total = 6, updated = 6),
+
+  // Advance watermark to 7 secs, no-data-batch does not drop state rows
+  AddData(inputData, (13 to 17): _*),
+  // 13 to 15 are duplicated
+  CheckNewAnswer(16, 17),
+  assertNumStateRows(total = 8, updated = 2),
+
+  AddData(inputData, 5), // Should not emit anything as data less than 
watermark
+  CheckNewAnswer(),
+  assertNumStateRows(total = 8, updated = 0, droppedByWatermark = 1),
+
+  // Advance watermark to 25 secs, no-data-batch drops state rows having 
expired time <= 25
+  AddData(inputData, 35),
+  CheckNewAnswer(35),
+  assertNumStateRows(total = 3, updated = 1),
+
+  // Advance watermark to 45 seconds, no-data-batch drops state rows 
having expired time <= 45
+  AddData(inputData, 55),
+  CheckNewAnswer(55),
+  assertNumStateRows(total = 1, updated = 1)
+)
+  }
+
+  test("deduplicate with some columns with event time column") {

Review Comment:
   without including the event_time column, right? 



##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed 

[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-30 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1153505179


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
   d.withNewChildren(Seq(simplifyUnion(u)))
 case d @ Deduplicate(_, u: Union) =>
   d.withNewChildren(Seq(simplifyUnion(u)))
+case d @ DeduplicateWithinWatermark(_, u: Union) =>

Review Comment:
   @zsxwing one more thing to consider: Shall we extend existing 
'DropDuplicate' Node to support this use case rather than introducing another 
logical node 'DropDuplicatesWithinWatermark'. It does not change the API, just 
the implementation. I am suggesting adding this option to exiting node. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-29 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1151482426


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
   d.withNewChildren(Seq(simplifyUnion(u)))
 case d @ Deduplicate(_, u: Union) =>
   d.withNewChildren(Seq(simplifyUnion(u)))
+case d @ DeduplicateWithinWatermark(_, u: Union) =>

Review Comment:
   There is no perfect watermark, and any guarantees that apply only with 
"perfect watermark" are no guarantees at all. 
   I don't think semantic deferences require disabling batch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-29 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1151447031


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##
@@ -980,3 +980,117 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
 
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateInfo: Option[StatefulOperatorStateInfo] = None,
+eventTimeWatermarkForLateEvents: Option[Long] = None,
+eventTimeWatermarkForEviction: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] = {
+StatefulOperatorPartitioning.getCompatibleDistribution(
+  keyExpressions, getStateInfo, conf) :: Nil
+  }
+
+  private val schemaForTimeoutRow: StructType = StructType(
+Array(StructField("expiresAt", LongType, nullable = false)))
+  private val eventTimeCol: Attribute = 
WatermarkSupport.findEventTimeColumn(child.output,
+allowMultipleEventTimeColumns = false).get
+  private val delayThresholdMillis = 
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+metrics // force lazy init at driver
+
+child.execute().mapPartitionsWithStateStore(
+  getStateInfo,
+  keyExpressions.toStructType,
+  schemaForTimeoutRow,
+  numColsPrefixKey = 0,
+  session.sessionState,
+  Some(session.streams.stateStoreCoordinator)) { (store, iter) =>
+  val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+
+  val timeoutToUnsafeRow = UnsafeProjection.create(schemaForTimeoutRow)
+  val timeoutRow = timeoutToUnsafeRow(new 
SpecificInternalRow(schemaForTimeoutRow))
+
+  val numOutputRows = longMetric("numOutputRows")
+  val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+  val numRemovedStateRows = longMetric("numRemovedStateRows")
+  val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
+  val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
+  val commitTimeMs = longMetric("commitTimeMs")
+  val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows")
+
+  val baseIterator = watermarkPredicateForDataForLateEvents match {
+case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, 
predicate)
+case None => iter
+  }
+
+  val updatesStartTimeNs = System.nanoTime
+
+  val result = baseIterator.filter { r =>
+val row = r.asInstanceOf[UnsafeRow]
+val key = getKey(row)
+val value = store.get(key)
+if (value == null) {
+  val timestamp = row.getLong(eventTimeColOrdinal)
+  // The unit of timestamp in Spark is microseconds, convert the delay 
threshold.
+  val expiresAt = timestamp + delayThresholdMillis * 1000
+
+  timeoutRow.setLong(0, expiresAt)
+  store.put(key, timeoutRow)
+
+  numUpdatedStateRows += 1
+  numOutputRows += 1
+  true
+} else {
+  // Drop duplicated rows
+  numDroppedDuplicateRows += 1
+  false
+}
+  }
+
+  CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
+allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - 
updatesStartTimeNs)
+allRemovalsTimeMs += timeTakenMs {
+  // Convert watermark value to microsecond
+  val watermarkForEviction = eventTimeWatermarkForEviction.get * 1000
+  store.iterator().foreach { rowPair =>

Review Comment:
   It depends on the operator. E.g. for windowed aggregation : if we keep 
window as the first column and the state store supports efficient range 
iterator, it could minimize scanning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-29 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1151443903


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
   d.withNewChildren(Seq(simplifyUnion(u)))
 case d @ Deduplicate(_, u: Union) =>
   d.withNewChildren(Seq(simplifyUnion(u)))
+case d @ DeduplicateWithinWatermark(_, u: Union) =>

Review Comment:
   dropDuplicates() produces different output between batch and streaming too 
(because batch does not drop late records). 
   As much as possible it is better to keep the API same between batch and 
streaming. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-28 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1151397406


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##
@@ -980,3 +980,117 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
 
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateInfo: Option[StatefulOperatorStateInfo] = None,
+eventTimeWatermarkForLateEvents: Option[Long] = None,
+eventTimeWatermarkForEviction: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] = {
+StatefulOperatorPartitioning.getCompatibleDistribution(
+  keyExpressions, getStateInfo, conf) :: Nil
+  }
+
+  private val schemaForTimeoutRow: StructType = StructType(
+Array(StructField("expiresAt", LongType, nullable = false)))
+  private val eventTimeCol: Attribute = 
WatermarkSupport.findEventTimeColumn(child.output,
+allowMultipleEventTimeColumns = false).get
+  private val delayThresholdMillis = 
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+metrics // force lazy init at driver
+
+child.execute().mapPartitionsWithStateStore(
+  getStateInfo,
+  keyExpressions.toStructType,
+  schemaForTimeoutRow,
+  numColsPrefixKey = 0,
+  session.sessionState,
+  Some(session.streams.stateStoreCoordinator)) { (store, iter) =>
+  val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+
+  val timeoutToUnsafeRow = UnsafeProjection.create(schemaForTimeoutRow)
+  val timeoutRow = timeoutToUnsafeRow(new 
SpecificInternalRow(schemaForTimeoutRow))
+
+  val numOutputRows = longMetric("numOutputRows")
+  val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+  val numRemovedStateRows = longMetric("numRemovedStateRows")
+  val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
+  val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
+  val commitTimeMs = longMetric("commitTimeMs")
+  val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows")
+
+  val baseIterator = watermarkPredicateForDataForLateEvents match {
+case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, 
predicate)
+case None => iter
+  }
+
+  val updatesStartTimeNs = System.nanoTime
+
+  val result = baseIterator.filter { r =>
+val row = r.asInstanceOf[UnsafeRow]
+val key = getKey(row)
+val value = store.get(key)
+if (value == null) {
+  val timestamp = row.getLong(eventTimeColOrdinal)
+  // The unit of timestamp in Spark is microseconds, convert the delay 
threshold.
+  val expiresAt = timestamp + delayThresholdMillis * 1000
+
+  timeoutRow.setLong(0, expiresAt)
+  store.put(key, timeoutRow)
+
+  numUpdatedStateRows += 1
+  numOutputRows += 1
+  true
+} else {
+  // Drop duplicated rows
+  numDroppedDuplicateRows += 1
+  false
+}
+  }
+
+  CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
+allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - 
updatesStartTimeNs)
+allRemovalsTimeMs += timeTakenMs {
+  // Convert watermark value to microsecond
+  val watermarkForEviction = eventTimeWatermarkForEviction.get * 1000
+  store.iterator().foreach { rowPair =>

Review Comment:
   [Unrelated to this PR] Do we scan the whole state in every microbatch? 
Probably same in all the stateful operators? 



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##
@@ -980,3 +980,117 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
 
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateInfo: Option[StatefulOperatorStateInfo] = None,
+eventTimeWatermarkForLateEvents: Option[Long] = None,
+eventTimeWatermarkForEviction: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] = {
+StatefulOperatorPartitioning.getCompatibleDistribution(
+  keyExpressions, getStateInfo, conf) :: Nil
+  }
+
+  private val schemaForTimeoutRow: StructType = StructType(
+Array(StructField("expiresAt", LongType, nullable = false)))
+  private 

[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-28 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1151374343


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -464,6 +469,19 @@ object UnsupportedOperationChecker extends Logging {
   throwError(s"Join type $joinType is not supported with streaming 
DataFrame/Dataset")
   }
 
+case d: DeduplicateWithinWatermark if d.isStreaming =>
+  // Find any attributes that are associated with an eventTime 
watermark.
+  val watermarkAttributes = d.child.output.collect {
+case a: Attribute if 
a.metadata.contains(EventTimeWatermark.delayKey) => a
+  }
+
+  // DeduplicateWithinWatermark requires event time column being set 
in the input DataFrame
+  if (watermarkAttributes.isEmpty) {
+throwError(
+  "dropDuplicatesWithinWatermark is not supported on streaming 
DataFrames/DataSets " +

Review Comment:
   [optional] 
   "dropDuplicatesWithinWatermark() requires watermark to be set set on 
Dataframe, but there is no watermark set." 



##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
 dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event 
times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the 
first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these 
rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, 
users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp 
differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid 
any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the 
subset of columns,
+   * as long as event times of duplicated rows are within delay threshold of 
watermark.

Review Comment:
   I know it is a tricky thing, but it might be better to rephrase. 



##
docs/structured-streaming-programming-guide.md:
##
@@ -2132,6 +2132,48 @@ streamingDf <- withWatermark(streamingDf, "eventTime", 
"10 seconds")
 streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
 {% endhighlight %}
 
+

Review Comment:
   [From PR description]
   > Only guarantee to deduplicate events within the watermark.
   
   'within watermark delay'



##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
 dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event 
times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the 
first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these 
rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, 
users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp 
differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid 
any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the 
subset of columns,
+   * as long as event times of duplicated rows are within delay threshold of 
watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the 
first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped