[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1160800555 ## python/pyspark/sql/dataframe.py: ## @@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] = None) -> "DataFrame": jdf = self._jdf.dropDuplicates(self._jseq(subset)) return DataFrame(jdf, self.sparkSession) +def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = None) -> "DataFrame": +"""Return a new :class:`DataFrame` with duplicate rows removed, + optionally only considering certain columns, within watermark. + +For a static batch :class:`DataFrame`, it just drops duplicate rows. For a streaming +:class:`DataFrame`, this will keep all data across triggers as intermediate state to drop +duplicated rows. The state will be kept to guarantee the semantic, "Events are deduplicated +as long as the time distance of earliest and latest events are smaller than the delay +threshold of watermark." The watermark for the input :class:`DataFrame` must be set via +:func:`withWatermark`. Users are encouraged to set the delay threshold of watermark longer Review Comment: dropDuplicates does not support exact same output between batch and streaming either. No stateful operation guarantees in the precense of late records. What is the difference here? Better to support batch in the same manner as dropDuplicates(). I don't think it is a good UX for customer to get errors then we fix it by relaxing. But I will leave the decision to you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1160032216 ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -3038,6 +3025,107 @@ class Dataset[T] private[sql]( dropDuplicates(colNames) } + /** + * Returns a new Dataset with duplicates rows removed, within watermark. + * + * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], + * this will keep all data across triggers as intermediate state to drop duplicated rows. The + * state will be kept to guarantee the semantic, "Events are deduplicated as long as the time + * distance of earliest and latest events are smaller than the delay threshold of watermark." + * The watermark for the input [[Dataset]] must be set via [[withWatermark]]. Users are + * encouraged to set the delay threshold of watermark longer than max timestamp differences + * among duplicated events. In addition, too late data older than watermark will be dropped. + * + * @group typedrel + * @since 3.5.0 + */ + def dropDuplicatesWithinWatermark(): Dataset[T] = { +dropDuplicatesWithinWatermark(this.columns) + } + + /** + * Returns a new Dataset with duplicates rows removed, considering only the subset of columns, + * within watermark. + * + * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], + * this will keep all data across triggers as intermediate state to drop duplicated rows. The + * state will be kept to guarantee the semantic, "Events are deduplicated as long as the time + * distance of earliest and latest events are smaller than the delay threshold of watermark." + * The watermark for the input [[Dataset]] must be set via [[withWatermark]]. Users are + * encouraged to set the delay threshold of watermark longer than max timestamp differences + * among duplicated events. In addition, too late data older than watermark will be dropped. Review Comment: It is statement on how watermark works, not related to dropDuplicatate(). R May be better to say 'Note that ...' rather than 'In addition ...' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1160030393 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] { d.withNewChildren(Seq(simplifyUnion(u))) case d @ Deduplicate(_, u: Union) => d.withNewChildren(Seq(simplifyUnion(u))) +case d @ DeduplicateWithinWatermark(_, u: Union) => Review Comment: Yeah, went with author's preferred two node option. @HeartSaVioR reused a lot of the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1160030393 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] { d.withNewChildren(Seq(simplifyUnion(u))) case d @ Deduplicate(_, u: Union) => d.withNewChildren(Seq(simplifyUnion(u))) +case d @ DeduplicateWithinWatermark(_, u: Union) => Review Comment: Yeah, went with the two node option. @HeartSaVioR reused a lot of the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1160027853 ## python/pyspark/sql/dataframe.py: ## @@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] = None) -> "DataFrame": jdf = self._jdf.dropDuplicates(self._jseq(subset)) return DataFrame(jdf, self.sparkSession) +def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = None) -> "DataFrame": +"""Return a new :class:`DataFrame` with duplicate rows removed, + optionally only considering certain columns, within watermark. + +For a static batch :class:`DataFrame`, it just drops duplicate rows. For a streaming +:class:`DataFrame`, this will keep all data across triggers as intermediate state to drop +duplicated rows. The state will be kept to guarantee the semantic, "Events are deduplicated +as long as the time distance of earliest and latest events are smaller than the delay +threshold of watermark." The watermark for the input :class:`DataFrame` must be set via +:func:`withWatermark`. Users are encouraged to set the delay threshold of watermark longer +than max timestamp differences among duplicated events. In addition, too late data older +than watermark will be dropped. + + .. versionadded:: 3.5.0 + + Parameters + -- + subset : List of column names, optional + List of columns to use for duplicate comparison (default All columns). + + Returns + --- + :class:`DataFrame` + DataFrame without duplicates. + + Examples + + >>> from pyspark.sql import Row + >>> df = spark.createDataFrame([ + ... Row(name='Alice', age=5, height=80), + ... Row(name='Alice', age=5, height=80), + ... Row(name='Alice', age=10, height=80) + ... ]) + + Deduplicate the same rows. + + >>> df.dropDuplicatesWithinWatermark().show() Review Comment: Also, even if it is a batch example, better to set watermark. It will serve as good example for users, since most of them will be looking to use in streaming context. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1160029232 ## python/pyspark/sql/dataframe.py: ## @@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] = None) -> "DataFrame": jdf = self._jdf.dropDuplicates(self._jseq(subset)) return DataFrame(jdf, self.sparkSession) +def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = None) -> "DataFrame": +"""Return a new :class:`DataFrame` with duplicate rows removed, + optionally only considering certain columns, within watermark. + +For a static batch :class:`DataFrame`, it just drops duplicate rows. For a streaming +:class:`DataFrame`, this will keep all data across triggers as intermediate state to drop +duplicated rows. The state will be kept to guarantee the semantic, "Events are deduplicated +as long as the time distance of earliest and latest events are smaller than the delay +threshold of watermark." The watermark for the input :class:`DataFrame` must be set via +:func:`withWatermark`. Users are encouraged to set the delay threshold of watermark longer +than max timestamp differences among duplicated events. In addition, too late data older +than watermark will be dropped. + + .. versionadded:: 3.5.0 + + Parameters + -- + subset : List of column names, optional + List of columns to use for duplicate comparison (default All columns). + + Returns + --- + :class:`DataFrame` + DataFrame without duplicates. + + Examples + + >>> from pyspark.sql import Row + >>> df = spark.createDataFrame([ + ... Row(name='Alice', age=5, height=80), + ... Row(name='Alice', age=5, height=80), + ... Row(name='Alice', age=10, height=80) + ... ]) + + Deduplicate the same rows. + + >>> df.dropDuplicatesWithinWatermark().show() + +-+---+--+ + | name|age|height| + +-+---+--+ + |Alice| 5|80| + |Alice| 10|80| + +-+---+--+ + + Deduplicate values on 'name' and 'height' columns. + + >>> df.dropDuplicatesWithinWatermark(['name', 'height']).show() Review Comment: +1. Lets add watermark() even if this is a batch example. better still is to make it streaming example (use rate source, and we relax the test assert) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1160025207 ## python/pyspark/sql/dataframe.py: ## @@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] = None) -> "DataFrame": jdf = self._jdf.dropDuplicates(self._jseq(subset)) return DataFrame(jdf, self.sparkSession) +def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = None) -> "DataFrame": +"""Return a new :class:`DataFrame` with duplicate rows removed, + optionally only considering certain columns, within watermark. + +For a static batch :class:`DataFrame`, it just drops duplicate rows. For a streaming +:class:`DataFrame`, this will keep all data across triggers as intermediate state to drop +duplicated rows. The state will be kept to guarantee the semantic, "Events are deduplicated +as long as the time distance of earliest and latest events are smaller than the delay +threshold of watermark." The watermark for the input :class:`DataFrame` must be set via +:func:`withWatermark`. Users are encouraged to set the delay threshold of watermark longer Review Comment: [Updated my comment]: I think batch should not require watermark. But all our examples including batch examples should set watermark to emphasis its importance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1160025207 ## python/pyspark/sql/dataframe.py: ## @@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] = None) -> "DataFrame": jdf = self._jdf.dropDuplicates(self._jseq(subset)) return DataFrame(jdf, self.sparkSession) +def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = None) -> "DataFrame": +"""Return a new :class:`DataFrame` with duplicate rows removed, + optionally only considering certain columns, within watermark. + +For a static batch :class:`DataFrame`, it just drops duplicate rows. For a streaming +:class:`DataFrame`, this will keep all data across triggers as intermediate state to drop +duplicated rows. The state will be kept to guarantee the semantic, "Events are deduplicated +as long as the time distance of earliest and latest events are smaller than the delay +threshold of watermark." The watermark for the input :class:`DataFrame` must be set via +:func:`withWatermark`. Users are encouraged to set the delay threshold of watermark longer Review Comment: I think it better to include watermark since this serves as important documentation for the users. Most of the time, the user is looking for an example to use in streaming. Not including watermark here is going to be confusing for them. The fact that it is ignored in batch is fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1159091700 ## python/pyspark/sql/tests/connect/test_parity_dataframe.py: ## @@ -41,6 +41,11 @@ def test_observe(self): def test_observe_str(self): super().test_observe_str() +# TODO(SPARK-X): Support Structured Streaming Review Comment: This test does not use streaming. ## python/pyspark/sql/dataframe.py: ## @@ -3928,6 +3928,71 @@ def dropDuplicates(self, subset: Optional[List[str]] = None) -> "DataFrame": jdf = self._jdf.dropDuplicates(self._jseq(subset)) return DataFrame(jdf, self.sparkSession) +def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = None) -> "DataFrame": +"""Return a new :class:`DataFrame` with duplicate rows removed, + optionally only considering certain columns, within watermark. + +For a static batch :class:`DataFrame`, it just drops duplicate rows. For a streaming Review Comment: Remove 'just' ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -3038,6 +3025,107 @@ class Dataset[T] private[sql]( dropDuplicates(colNames) } + /** + * Returns a new Dataset with duplicates rows removed, within watermark. + * + * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], + * this will keep all data across triggers as intermediate state to drop duplicated rows. The + * state will be kept to guarantee the semantic, "Events are deduplicated as long as the time + * distance of earliest and latest events are smaller than the delay threshold of watermark." + * The watermark for the input [[Dataset]] must be set via [[withWatermark]]. Users are + * encouraged to set the delay threshold of watermark longer than max timestamp differences + * among duplicated events. In addition, too late data older than watermark will be dropped. + * + * @group typedrel + * @since 3.5.0 + */ + def dropDuplicatesWithinWatermark(): Dataset[T] = { +dropDuplicatesWithinWatermark(this.columns) + } + + /** + * Returns a new Dataset with duplicates rows removed, considering only the subset of columns, + * within watermark. + * + * For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], + * this will keep all data across triggers as intermediate state to drop duplicated rows. The + * state will be kept to guarantee the semantic, "Events are deduplicated as long as the time + * distance of earliest and latest events are smaller than the delay threshold of watermark." + * The watermark for the input [[Dataset]] must be set via [[withWatermark]]. Users are + * encouraged to set the delay threshold of watermark longer than max timestamp differences + * among duplicated events. In addition, too late data older than watermark will be dropped. Review Comment: > In addition, too late data older than watermark will be dropped. Why is it required here? Simpler to remove? ## docs/structured-streaming-programming-guide.md: ## @@ -2132,6 +2132,61 @@ streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds") streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime") {% endhighlight %} + + + + +Specifically for streaming, you can deduplicate records in data streams using a unique identifier in the events, within the time range of watermark. +For example, if you set the delay threshold of watermark as "1 hour", duplicated events which occurred within 1 hour can be correctly deduplicated. +(For more details, please refer to the API doc of [dropDuplicatesWithinWatermark](/api/scala/org/apache/spark/sql/Dataset.html#dropDuplicatesWithinWatermark():org.apache.spark.sql.Dataset[T]).) + +This can be used to deal with use case where event time column cannot be a part of unique identifier, mostly due to the case +where event times are somehow different for the same records. (E.g. non-idempotent writer where issuing event time happens at write) + +Users are encouraged to set the delay threshold of watermark longer than max timestamp differences among duplicated events. + +This feature requires watermark with delay threshold to be set in streaming DataFrame/Dataset. + + + + + +{% highlight python %} +streamingDf = spark.readStream. ... + +# deduplicate using guid column with watermark based on eventTime column +streamingDf \ + .withWatermark("eventTime", "10 seconds") \ + .dropDuplicatesWithinWatermark("guid") +{% endhighlight %} + + + + + +{% highlight scala %} +val streamingDf = spark.readStream. ... // columns: guid, eventTime, ... + +// deduplicate using guid column with watermark based on eventTime column +streamingDf + .withWatermark("eventTime", "10 seconds") Review Comment: How about "1 hour" or "1 day"? 10 seconds seems very low for most use cases of this.
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1154705202 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ## @@ -980,3 +1022,65 @@ object StreamingDeduplicateExec { private val EMPTY_ROW = UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) } + +case class StreamingDeduplicateWithinWatermarkExec( +keyExpressions: Seq[Attribute], +child: SparkPlan, +stateInfo: Option[StatefulOperatorStateInfo] = None, +eventTimeWatermarkForLateEvents: Option[Long] = None, +eventTimeWatermarkForEviction: Option[Long] = None) + extends BaseStreamingDeduplicateExec { + + protected val schemaForValueRow: StructType = StructType( +Array(StructField("expiresAt", LongType, nullable = false))) Review Comment: If we are saving long, I would suggesting changing the name to `expiresAtMs` or `expiresAtMicros` depending on it unit. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ## @@ -980,3 +1022,65 @@ object StreamingDeduplicateExec { private val EMPTY_ROW = UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) } + +case class StreamingDeduplicateWithinWatermarkExec( +keyExpressions: Seq[Attribute], +child: SparkPlan, +stateInfo: Option[StatefulOperatorStateInfo] = None, +eventTimeWatermarkForLateEvents: Option[Long] = None, +eventTimeWatermarkForEviction: Option[Long] = None) + extends BaseStreamingDeduplicateExec { + + protected val schemaForValueRow: StructType = StructType( +Array(StructField("expiresAt", LongType, nullable = false))) Review Comment: If we are saving long, I would suggest changing the name to `expiresAtMs` or `expiresAtMicros` depending on it unit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1154700264 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ## @@ -980,3 +1022,65 @@ object StreamingDeduplicateExec { private val EMPTY_ROW = UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) } + +case class StreamingDeduplicateWithinWatermarkExec( +keyExpressions: Seq[Attribute], +child: SparkPlan, +stateInfo: Option[StatefulOperatorStateInfo] = None, +eventTimeWatermarkForLateEvents: Option[Long] = None, +eventTimeWatermarkForEviction: Option[Long] = None) + extends BaseStreamingDeduplicateExec { + + protected val schemaForValueRow: StructType = StructType( +Array(StructField("expiresAt", LongType, nullable = false))) Review Comment: So the type of value in the state will be Long? SGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1153915858 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.sql.{AnalysisException, Dataset, SaveMode} +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions.timestamp_seconds + +class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest { + + import testImplicits._ + + test("deduplicate without event time column") { +def testAndVerify(df: Dataset[_]): Unit = { + val exc = intercept[AnalysisException] { +df.writeStream.format("noop").start() + } + + assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not supported")) Review Comment: Optional. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1153915713 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.sql.{AnalysisException, Dataset, SaveMode} +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions.timestamp_seconds + +class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest { + + import testImplicits._ + + test("deduplicate without event time column") { +def testAndVerify(df: Dataset[_]): Unit = { + val exc = intercept[AnalysisException] { +df.writeStream.format("noop").start() + } + + assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not supported")) + assert(exc.getMessage.contains("streaming DataFrames/DataSets without watermark")) +} + +val inputData = MemoryStream[String] +val result = inputData.toDS().dropDuplicatesWithinWatermark() +testAndVerify(result) + +val result2 = inputData.toDS().withColumn("newcol", $"value") + .dropDuplicatesWithinWatermark("newcol") +testAndVerify(result2) + +val inputData2 = MemoryStream[(String, Int)] +val otherSideForJoin = inputData2.toDF() + .select($"_1" as "key", timestamp_seconds($"_2") as "time") + .withWatermark("Time", "10 seconds") + +val result3 = inputData.toDS() + .select($"value".as("key")) + // there are two streams which one stream only defines the watermark. the stream which + // contains dropDuplicatesWithinWatermark does not define the watermark, which is not + // supported. + .dropDuplicatesWithinWatermark() + .join(otherSideForJoin, "key") +testAndVerify(result3) + } + + test("deduplicate with all columns with event time column") { +val inputData = MemoryStream[Int] +val result = inputData.toDS() + .withColumn("eventTime", timestamp_seconds($"value")) + .withWatermark("eventTime", "10 seconds") + .dropDuplicatesWithinWatermark() + .select($"eventTime".cast("long").as[Long]) + +testStream(result, Append)( + // Advance watermark to 5 secs, no-data-batch does not drop state rows + AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*), + CheckAnswer(10 to 15: _*), + assertNumStateRows(total = 6, updated = 6), + + // Advance watermark to 7 secs, no-data-batch does not drop state rows + AddData(inputData, (13 to 17): _*), + // 13 to 15 are duplicated + CheckNewAnswer(16, 17), + assertNumStateRows(total = 8, updated = 2), + + AddData(inputData, 5), // Should not emit anything as data less than watermark + CheckNewAnswer(), + assertNumStateRows(total = 8, updated = 0, droppedByWatermark = 1), + + // Advance watermark to 25 secs, no-data-batch drops state rows having expired time <= 25 + AddData(inputData, 35), + CheckNewAnswer(35), + assertNumStateRows(total = 3, updated = 1), + + // Advance watermark to 45 seconds, no-data-batch drops state rows having expired time <= 45 + AddData(inputData, 55), + CheckNewAnswer(55), + assertNumStateRows(total = 1, updated = 1) +) + } + + test("deduplicate with some columns with event time column") { Review Comment: Isn't this testing the case where "eventTime" column is not one of the dedup-columns? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1153915130 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.sql.{AnalysisException, Dataset, SaveMode} +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions.timestamp_seconds + +class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest { + + import testImplicits._ + + test("deduplicate without event time column") { +def testAndVerify(df: Dataset[_]): Unit = { + val exc = intercept[AnalysisException] { +df.writeStream.format("noop").start() + } + + assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not supported")) + assert(exc.getMessage.contains("streaming DataFrames/DataSets without watermark")) +} + +val inputData = MemoryStream[String] +val result = inputData.toDS().dropDuplicatesWithinWatermark() +testAndVerify(result) + +val result2 = inputData.toDS().withColumn("newcol", $"value") + .dropDuplicatesWithinWatermark("newcol") +testAndVerify(result2) + +val inputData2 = MemoryStream[(String, Int)] +val otherSideForJoin = inputData2.toDF() + .select($"_1" as "key", timestamp_seconds($"_2") as "time") + .withWatermark("Time", "10 seconds") + +val result3 = inputData.toDS() + .select($"value".as("key")) + // there are two streams which one stream only defines the watermark. the stream which + // contains dropDuplicatesWithinWatermark does not define the watermark, which is not + // supported. + .dropDuplicatesWithinWatermark() + .join(otherSideForJoin, "key") +testAndVerify(result3) + } + + test("deduplicate with all columns with event time column") { +val inputData = MemoryStream[Int] +val result = inputData.toDS() + .withColumn("eventTime", timestamp_seconds($"value")) + .withWatermark("eventTime", "10 seconds") + .dropDuplicatesWithinWatermark() + .select($"eventTime".cast("long").as[Long]) + +testStream(result, Append)( + // Advance watermark to 5 secs, no-data-batch does not drop state rows + AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*), + CheckAnswer(10 to 15: _*), + assertNumStateRows(total = 6, updated = 6), + + // Advance watermark to 7 secs, no-data-batch does not drop state rows + AddData(inputData, (13 to 17): _*), + // 13 to 15 are duplicated + CheckNewAnswer(16, 17), + assertNumStateRows(total = 8, updated = 2), + + AddData(inputData, 5), // Should not emit anything as data less than watermark + CheckNewAnswer(), + assertNumStateRows(total = 8, updated = 0, droppedByWatermark = 1), + + // Advance watermark to 25 secs, no-data-batch drops state rows having expired time <= 25 + AddData(inputData, 35), + CheckNewAnswer(35), + assertNumStateRows(total = 3, updated = 1), + + // Advance watermark to 45 seconds, no-data-batch drops state rows having expired time <= 45 + AddData(inputData, 55), + CheckNewAnswer(55), + assertNumStateRows(total = 1, updated = 1) +) + } + + test("deduplicate with some columns with event time column") { +val inputData = MemoryStream[(String, Int)] +val result = inputData.toDS() + .withColumn("eventTime", timestamp_seconds($"_2")) + .withWatermark("eventTime", "2 seconds") + .dropDuplicatesWithinWatermark("_1") + .select($"_1", $"eventTime".cast("long").as[Long]) + +testStream(result, Append)( + // Advances watermark to 15 + AddData(inputData, "a" -> 17), + CheckNewAnswer("a" -> 17), + // expired time is set to 19 + assertNumStateRows(total = 1, updated = 1), + + // Watermark does not advance + AddData(inputData, "a" -> 16), + CheckNewAnswer(), + assertNumStateRows(total = 1, updated = 0), + + //
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1153914621 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.sql.{AnalysisException, Dataset, SaveMode} +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions.timestamp_seconds + +class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest { + + import testImplicits._ + + test("deduplicate without event time column") { +def testAndVerify(df: Dataset[_]): Unit = { + val exc = intercept[AnalysisException] { +df.writeStream.format("noop").start() + } + + assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not supported")) + assert(exc.getMessage.contains("streaming DataFrames/DataSets without watermark")) +} + +val inputData = MemoryStream[String] +val result = inputData.toDS().dropDuplicatesWithinWatermark() +testAndVerify(result) + +val result2 = inputData.toDS().withColumn("newcol", $"value") + .dropDuplicatesWithinWatermark("newcol") +testAndVerify(result2) + +val inputData2 = MemoryStream[(String, Int)] +val otherSideForJoin = inputData2.toDF() + .select($"_1" as "key", timestamp_seconds($"_2") as "time") + .withWatermark("Time", "10 seconds") + +val result3 = inputData.toDS() + .select($"value".as("key")) + // there are two streams which one stream only defines the watermark. the stream which + // contains dropDuplicatesWithinWatermark does not define the watermark, which is not + // supported. + .dropDuplicatesWithinWatermark() + .join(otherSideForJoin, "key") +testAndVerify(result3) + } + + test("deduplicate with all columns with event time column") { +val inputData = MemoryStream[Int] +val result = inputData.toDS() + .withColumn("eventTime", timestamp_seconds($"value")) + .withWatermark("eventTime", "10 seconds") + .dropDuplicatesWithinWatermark() + .select($"eventTime".cast("long").as[Long]) + +testStream(result, Append)( + // Advance watermark to 5 secs, no-data-batch does not drop state rows + AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*), + CheckAnswer(10 to 15: _*), + assertNumStateRows(total = 6, updated = 6), + + // Advance watermark to 7 secs, no-data-batch does not drop state rows + AddData(inputData, (13 to 17): _*), + // 13 to 15 are duplicated + CheckNewAnswer(16, 17), + assertNumStateRows(total = 8, updated = 2), + + AddData(inputData, 5), // Should not emit anything as data less than watermark + CheckNewAnswer(), + assertNumStateRows(total = 8, updated = 0, droppedByWatermark = 1), + + // Advance watermark to 25 secs, no-data-batch drops state rows having expired time <= 25 + AddData(inputData, 35), + CheckNewAnswer(35), + assertNumStateRows(total = 3, updated = 1), + + // Advance watermark to 45 seconds, no-data-batch drops state rows having expired time <= 45 + AddData(inputData, 55), + CheckNewAnswer(55), + assertNumStateRows(total = 1, updated = 1) +) + } + + test("deduplicate with some columns with event time column") { +val inputData = MemoryStream[(String, Int)] +val result = inputData.toDS() + .withColumn("eventTime", timestamp_seconds($"_2")) + .withWatermark("eventTime", "2 seconds") + .dropDuplicatesWithinWatermark("_1") + .select($"_1", $"eventTime".cast("long").as[Long]) + +testStream(result, Append)( + // Advances watermark to 15 + AddData(inputData, "a" -> 17), + CheckNewAnswer("a" -> 17), + // expired time is set to 19 + assertNumStateRows(total = 1, updated = 1), + + // Watermark does not advance + AddData(inputData, "a" -> 16), + CheckNewAnswer(), + assertNumStateRows(total = 1, updated = 0), + + //
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1153675952 ## sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala: ## @@ -1742,6 +1742,8 @@ class DataFrameSuite extends QueryTest Seq(Row(2, 1, 2), Row(1, 2, 1), Row(1, 1, 1), Row(2, 2, 2))) } + // FIXME: add dropDuplicatesWithinWatermark Review Comment: Any reason to post pone it? ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.sql.{AnalysisException, Dataset, SaveMode} +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions.timestamp_seconds + +class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest { + + import testImplicits._ + + test("deduplicate without event time column") { +def testAndVerify(df: Dataset[_]): Unit = { + val exc = intercept[AnalysisException] { +df.writeStream.format("noop").start() + } + + assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not supported")) + assert(exc.getMessage.contains("streaming DataFrames/DataSets without watermark")) +} + +val inputData = MemoryStream[String] +val result = inputData.toDS().dropDuplicatesWithinWatermark() +testAndVerify(result) + +val result2 = inputData.toDS().withColumn("newcol", $"value") + .dropDuplicatesWithinWatermark("newcol") +testAndVerify(result2) + +val inputData2 = MemoryStream[(String, Int)] +val otherSideForJoin = inputData2.toDF() + .select($"_1" as "key", timestamp_seconds($"_2") as "time") + .withWatermark("Time", "10 seconds") + +val result3 = inputData.toDS() + .select($"value".as("key")) + // there are two streams which one stream only defines the watermark. the stream which + // contains dropDuplicatesWithinWatermark does not define the watermark, which is not + // supported. + .dropDuplicatesWithinWatermark() + .join(otherSideForJoin, "key") +testAndVerify(result3) + } + + test("deduplicate with all columns with event time column") { +val inputData = MemoryStream[Int] +val result = inputData.toDS() + .withColumn("eventTime", timestamp_seconds($"value")) + .withWatermark("eventTime", "10 seconds") + .dropDuplicatesWithinWatermark() + .select($"eventTime".cast("long").as[Long]) + +testStream(result, Append)( + // Advance watermark to 5 secs, no-data-batch does not drop state rows + AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*), + CheckAnswer(10 to 15: _*), + assertNumStateRows(total = 6, updated = 6), + + // Advance watermark to 7 secs, no-data-batch does not drop state rows + AddData(inputData, (13 to 17): _*), + // 13 to 15 are duplicated + CheckNewAnswer(16, 17), + assertNumStateRows(total = 8, updated = 2), + + AddData(inputData, 5), // Should not emit anything as data less than watermark + CheckNewAnswer(), + assertNumStateRows(total = 8, updated = 0, droppedByWatermark = 1), + + // Advance watermark to 25 secs, no-data-batch drops state rows having expired time <= 25 + AddData(inputData, 35), + CheckNewAnswer(35), + assertNumStateRows(total = 3, updated = 1), + + // Advance watermark to 45 seconds, no-data-batch drops state rows having expired time <= 45 + AddData(inputData, 55), + CheckNewAnswer(55), + assertNumStateRows(total = 1, updated = 1) +) + } + + test("deduplicate with some columns with event time column") { Review Comment: without including the event_time column, right? ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1153505179 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] { d.withNewChildren(Seq(simplifyUnion(u))) case d @ Deduplicate(_, u: Union) => d.withNewChildren(Seq(simplifyUnion(u))) +case d @ DeduplicateWithinWatermark(_, u: Union) => Review Comment: @zsxwing one more thing to consider: Shall we extend existing 'DropDuplicate' Node to support this use case rather than introducing another logical node 'DropDuplicatesWithinWatermark'. It does not change the API, just the implementation. I am suggesting adding this option to exiting node. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1151482426 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] { d.withNewChildren(Seq(simplifyUnion(u))) case d @ Deduplicate(_, u: Union) => d.withNewChildren(Seq(simplifyUnion(u))) +case d @ DeduplicateWithinWatermark(_, u: Union) => Review Comment: There is no perfect watermark, and any guarantees that apply only with "perfect watermark" are no guarantees at all. I don't think semantic deferences require disabling batch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1151447031 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ## @@ -980,3 +980,117 @@ object StreamingDeduplicateExec { private val EMPTY_ROW = UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) } + +case class StreamingDeduplicateWithinWatermarkExec( +keyExpressions: Seq[Attribute], +child: SparkPlan, +stateInfo: Option[StatefulOperatorStateInfo] = None, +eventTimeWatermarkForLateEvents: Option[Long] = None, +eventTimeWatermarkForEviction: Option[Long] = None) + extends UnaryExecNode with StateStoreWriter with WatermarkSupport { + + /** Distribute by grouping attributes */ + override def requiredChildDistribution: Seq[Distribution] = { +StatefulOperatorPartitioning.getCompatibleDistribution( + keyExpressions, getStateInfo, conf) :: Nil + } + + private val schemaForTimeoutRow: StructType = StructType( +Array(StructField("expiresAt", LongType, nullable = false))) + private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output, +allowMultipleEventTimeColumns = false).get + private val delayThresholdMillis = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey) + private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol) + + override protected def doExecute(): RDD[InternalRow] = { +metrics // force lazy init at driver + +child.execute().mapPartitionsWithStateStore( + getStateInfo, + keyExpressions.toStructType, + schemaForTimeoutRow, + numColsPrefixKey = 0, + session.sessionState, + Some(session.streams.stateStoreCoordinator)) { (store, iter) => + val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) + + val timeoutToUnsafeRow = UnsafeProjection.create(schemaForTimeoutRow) + val timeoutRow = timeoutToUnsafeRow(new SpecificInternalRow(schemaForTimeoutRow)) + + val numOutputRows = longMetric("numOutputRows") + val numUpdatedStateRows = longMetric("numUpdatedStateRows") + val numRemovedStateRows = longMetric("numRemovedStateRows") + val allUpdatesTimeMs = longMetric("allUpdatesTimeMs") + val allRemovalsTimeMs = longMetric("allRemovalsTimeMs") + val commitTimeMs = longMetric("commitTimeMs") + val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows") + + val baseIterator = watermarkPredicateForDataForLateEvents match { +case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate) +case None => iter + } + + val updatesStartTimeNs = System.nanoTime + + val result = baseIterator.filter { r => +val row = r.asInstanceOf[UnsafeRow] +val key = getKey(row) +val value = store.get(key) +if (value == null) { + val timestamp = row.getLong(eventTimeColOrdinal) + // The unit of timestamp in Spark is microseconds, convert the delay threshold. + val expiresAt = timestamp + delayThresholdMillis * 1000 + + timeoutRow.setLong(0, expiresAt) + store.put(key, timeoutRow) + + numUpdatedStateRows += 1 + numOutputRows += 1 + true +} else { + // Drop duplicated rows + numDroppedDuplicateRows += 1 + false +} + } + + CompletionIterator[InternalRow, Iterator[InternalRow]](result, { +allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - updatesStartTimeNs) +allRemovalsTimeMs += timeTakenMs { + // Convert watermark value to microsecond + val watermarkForEviction = eventTimeWatermarkForEviction.get * 1000 + store.iterator().foreach { rowPair => Review Comment: It depends on the operator. E.g. for windowed aggregation : if we keep window as the first column and the state store supports efficient range iterator, it could minimize scanning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1151443903 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] { d.withNewChildren(Seq(simplifyUnion(u))) case d @ Deduplicate(_, u: Union) => d.withNewChildren(Seq(simplifyUnion(u))) +case d @ DeduplicateWithinWatermark(_, u: Union) => Review Comment: dropDuplicates() produces different output between batch and streaming too (because batch does not drop late records). As much as possible it is better to keep the API same between batch and streaming. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1151397406 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ## @@ -980,3 +980,117 @@ object StreamingDeduplicateExec { private val EMPTY_ROW = UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) } + +case class StreamingDeduplicateWithinWatermarkExec( +keyExpressions: Seq[Attribute], +child: SparkPlan, +stateInfo: Option[StatefulOperatorStateInfo] = None, +eventTimeWatermarkForLateEvents: Option[Long] = None, +eventTimeWatermarkForEviction: Option[Long] = None) + extends UnaryExecNode with StateStoreWriter with WatermarkSupport { + + /** Distribute by grouping attributes */ + override def requiredChildDistribution: Seq[Distribution] = { +StatefulOperatorPartitioning.getCompatibleDistribution( + keyExpressions, getStateInfo, conf) :: Nil + } + + private val schemaForTimeoutRow: StructType = StructType( +Array(StructField("expiresAt", LongType, nullable = false))) + private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output, +allowMultipleEventTimeColumns = false).get + private val delayThresholdMillis = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey) + private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol) + + override protected def doExecute(): RDD[InternalRow] = { +metrics // force lazy init at driver + +child.execute().mapPartitionsWithStateStore( + getStateInfo, + keyExpressions.toStructType, + schemaForTimeoutRow, + numColsPrefixKey = 0, + session.sessionState, + Some(session.streams.stateStoreCoordinator)) { (store, iter) => + val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) + + val timeoutToUnsafeRow = UnsafeProjection.create(schemaForTimeoutRow) + val timeoutRow = timeoutToUnsafeRow(new SpecificInternalRow(schemaForTimeoutRow)) + + val numOutputRows = longMetric("numOutputRows") + val numUpdatedStateRows = longMetric("numUpdatedStateRows") + val numRemovedStateRows = longMetric("numRemovedStateRows") + val allUpdatesTimeMs = longMetric("allUpdatesTimeMs") + val allRemovalsTimeMs = longMetric("allRemovalsTimeMs") + val commitTimeMs = longMetric("commitTimeMs") + val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows") + + val baseIterator = watermarkPredicateForDataForLateEvents match { +case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate) +case None => iter + } + + val updatesStartTimeNs = System.nanoTime + + val result = baseIterator.filter { r => +val row = r.asInstanceOf[UnsafeRow] +val key = getKey(row) +val value = store.get(key) +if (value == null) { + val timestamp = row.getLong(eventTimeColOrdinal) + // The unit of timestamp in Spark is microseconds, convert the delay threshold. + val expiresAt = timestamp + delayThresholdMillis * 1000 + + timeoutRow.setLong(0, expiresAt) + store.put(key, timeoutRow) + + numUpdatedStateRows += 1 + numOutputRows += 1 + true +} else { + // Drop duplicated rows + numDroppedDuplicateRows += 1 + false +} + } + + CompletionIterator[InternalRow, Iterator[InternalRow]](result, { +allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - updatesStartTimeNs) +allRemovalsTimeMs += timeTakenMs { + // Convert watermark value to microsecond + val watermarkForEviction = eventTimeWatermarkForEviction.get * 1000 + store.iterator().foreach { rowPair => Review Comment: [Unrelated to this PR] Do we scan the whole state in every microbatch? Probably same in all the stateful operators? ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ## @@ -980,3 +980,117 @@ object StreamingDeduplicateExec { private val EMPTY_ROW = UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) } + +case class StreamingDeduplicateWithinWatermarkExec( +keyExpressions: Seq[Attribute], +child: SparkPlan, +stateInfo: Option[StatefulOperatorStateInfo] = None, +eventTimeWatermarkForLateEvents: Option[Long] = None, +eventTimeWatermarkForEviction: Option[Long] = None) + extends UnaryExecNode with StateStoreWriter with WatermarkSupport { + + /** Distribute by grouping attributes */ + override def requiredChildDistribution: Seq[Distribution] = { +StatefulOperatorPartitioning.getCompatibleDistribution( + keyExpressions, getStateInfo, conf) :: Nil + } + + private val schemaForTimeoutRow: StructType = StructType( +Array(StructField("expiresAt", LongType, nullable = false))) + private
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1151374343 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -464,6 +469,19 @@ object UnsupportedOperationChecker extends Logging { throwError(s"Join type $joinType is not supported with streaming DataFrame/Dataset") } +case d: DeduplicateWithinWatermark if d.isStreaming => + // Find any attributes that are associated with an eventTime watermark. + val watermarkAttributes = d.child.output.collect { +case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a + } + + // DeduplicateWithinWatermark requires event time column being set in the input DataFrame + if (watermarkAttributes.isEmpty) { +throwError( + "dropDuplicatesWithinWatermark is not supported on streaming DataFrames/DataSets " + Review Comment: [optional] "dropDuplicatesWithinWatermark() requires watermark to be set set on Dataframe, but there is no watermark set." ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -3038,6 +3038,118 @@ class Dataset[T] private[sql]( dropDuplicates(colNames) } + /** + * Returns a new Dataset with duplicates rows removed, as long as event times of duplicated rows + * are within delay threshold of watermark. + * + * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be + * set via [[withWatermark]]. + * + * This will keep all data across triggers as intermediate state to drop duplicated rows. The + * state will be kept to guarantee the following, "If event time of the first arrived event is + * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time + * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to + * set the delay threshold of watermark longer than max timestamp differences among duplicated + * events. + * + * In addition, too late data older than watermark will be dropped to avoid any possibility + * of duplicates. + * + * @group typedrel + * @since 3.5.0 + */ + def dropDuplicatesWithinWatermark(): Dataset[T] = { +dropDuplicatesWithinWatermark(this.columns) + } + + /** + * Returns a new Dataset with duplicates rows removed, considering only the subset of columns, + * as long as event times of duplicated rows are within delay threshold of watermark. Review Comment: I know it is a tricky thing, but it might be better to rephrase. ## docs/structured-streaming-programming-guide.md: ## @@ -2132,6 +2132,48 @@ streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds") streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime") {% endhighlight %} + Review Comment: [From PR description] > Only guarantee to deduplicate events within the watermark. 'within watermark delay' ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -3038,6 +3038,118 @@ class Dataset[T] private[sql]( dropDuplicates(colNames) } + /** + * Returns a new Dataset with duplicates rows removed, as long as event times of duplicated rows + * are within delay threshold of watermark. + * + * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be + * set via [[withWatermark]]. + * + * This will keep all data across triggers as intermediate state to drop duplicated rows. The + * state will be kept to guarantee the following, "If event time of the first arrived event is + * 'ts', this guarantees all duplicated rows will be dropped where these rows are within the time + * range of (ts - delay threshold, ts + delay threshold)". In practice, users are encouraged to + * set the delay threshold of watermark longer than max timestamp differences among duplicated + * events. + * + * In addition, too late data older than watermark will be dropped to avoid any possibility + * of duplicates. + * + * @group typedrel + * @since 3.5.0 + */ + def dropDuplicatesWithinWatermark(): Dataset[T] = { +dropDuplicatesWithinWatermark(this.columns) + } + + /** + * Returns a new Dataset with duplicates rows removed, considering only the subset of columns, + * as long as event times of duplicated rows are within delay threshold of watermark. + * + * This only works with streaming [[Dataset]], and watermark for the input [[Dataset]] must be + * set via [[withWatermark]]. + * + * This will keep all data across triggers as intermediate state to drop duplicated rows. The + * state will be kept to guarantee the following, "If event time of the first arrived event is + * 'ts', this guarantees all duplicated rows will be dropped