rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1151374343


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -464,6 +469,19 @@ object UnsupportedOperationChecker extends Logging {
               throwError(s"Join type $joinType is not supported with streaming 
DataFrame/Dataset")
           }
 
+        case d: DeduplicateWithinWatermark if d.isStreaming =>
+          // Find any attributes that are associated with an eventTime 
watermark.
+          val watermarkAttributes = d.child.output.collect {
+            case a: Attribute if 
a.metadata.contains(EventTimeWatermark.delayKey) => a
+          }
+
+          // DeduplicateWithinWatermark requires event time column being set 
in the input DataFrame
+          if (watermarkAttributes.isEmpty) {
+            throwError(
+              "dropDuplicatesWithinWatermark is not supported on streaming 
DataFrames/DataSets " +

Review Comment:
   [optional] 
   "dropDuplicatesWithinWatermark() requires watermark to be set set on 
Dataframe, but there is no watermark set." 



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event 
times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the 
first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these 
rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, 
users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp 
differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid 
any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the 
subset of columns,
+   * as long as event times of duplicated rows are within delay threshold of 
watermark.

Review Comment:
   I know it is a tricky thing, but it might be better to rephrase. 



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -2132,6 +2132,48 @@ streamingDf <- withWatermark(streamingDf, "eventTime", 
"10 seconds")
 streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
 {% endhighlight %}
 
+</div>

Review Comment:
   [From PR description]
   > Only guarantee to deduplicate events within the watermark.
   
   'within watermark delay'



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event 
times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the 
first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these 
rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, 
users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp 
differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid 
any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the 
subset of columns,
+   * as long as event times of duplicated rows are within delay threshold of 
watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the 
first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these 
rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, 
users are encouraged to

Review Comment:
   I don't think we guarantee this condition : `(ts - delay threshold, ts + 
delay threshold)`. 
   we likely need to rephrase it. We can look at the scaladoc towards the end 
before merging this. 



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event 
times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the 
first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these 
rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, 
users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp 
differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid 
any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the 
subset of columns,
+   * as long as event times of duplicated rows are within delay threshold of 
watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the 
first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these 
rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, 
users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp 
differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid 
any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = 
withTypedPlan {
+    val resolver = sparkSession.sessionState.analyzer.resolver

Review Comment:
   can we share the this with dropDuplicate()? or even better we can reuse 
'Deduplicate()' node(s). 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +980,117 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
     
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+    keyExpressions: Seq[Attribute],
+    child: SparkPlan,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    eventTimeWatermarkForLateEvents: Option[Long] = None,
+    eventTimeWatermarkForEviction: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] = {
+    StatefulOperatorPartitioning.getCompatibleDistribution(
+      keyExpressions, getStateInfo, conf) :: Nil
+  }
+
+  private val schemaForTimeoutRow: StructType = StructType(
+    Array(StructField("expiresAt", LongType, nullable = false)))
+  private val eventTimeCol: Attribute = 
WatermarkSupport.findEventTimeColumn(child.output,
+    allowMultipleEventTimeColumns = false).get
+  private val delayThresholdMillis = 
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    metrics // force lazy init at driver
+
+    child.execute().mapPartitionsWithStateStore(
+      getStateInfo,
+      keyExpressions.toStructType,
+      schemaForTimeoutRow,
+      numColsPrefixKey = 0,
+      session.sessionState,
+      Some(session.streams.stateStoreCoordinator)) { (store, iter) =>
+      val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+
+      val timeoutToUnsafeRow = UnsafeProjection.create(schemaForTimeoutRow)
+      val timeoutRow = timeoutToUnsafeRow(new 
SpecificInternalRow(schemaForTimeoutRow))
+
+      val numOutputRows = longMetric("numOutputRows")
+      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+      val numRemovedStateRows = longMetric("numRemovedStateRows")
+      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
+      val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
+      val commitTimeMs = longMetric("commitTimeMs")
+      val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows")
+
+      val baseIterator = watermarkPredicateForDataForLateEvents match {
+        case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, 
predicate)
+        case None => iter
+      }
+
+      val updatesStartTimeNs = System.nanoTime
+
+      val result = baseIterator.filter { r =>

Review Comment:
   When we reuse Deduplicate() logical node: mainly this part and removal would 
be different based on 'dropWithinWatermark' flag. That we most of the remaining 
code remains unchanged. 



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event 
times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the 
first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these 
rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, 
users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp 
differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid 
any possibility
+   * of duplicates.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the 
subset of columns,
+   * as long as event times of duplicated rows are within delay threshold of 
watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * This will keep all data across triggers as intermediate state to drop 
duplicated rows. The
+   * state will be kept to guarantee the following, "If event time of the 
first arrived event is
+   * 'ts', this guarantees all duplicated rows will be dropped where these 
rows are within the time
+   * range of (ts - delay threshold, ts + delay threshold)". In practice, 
users are encouraged to
+   * set the delay threshold of watermark longer than max timestamp 
differences among duplicated
+   * events.
+   *
+   * In addition, too late data older than watermark will be dropped to avoid 
any possibility
+   * of duplicates.

Review Comment:
   Same here. 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
       d.withNewChildren(Seq(simplifyUnion(u)))
     case d @ Deduplicate(_, u: Union) =>
       d.withNewChildren(Seq(simplifyUnion(u)))
+    case d @ DeduplicateWithinWatermark(_, u: Union) =>

Review Comment:
   Rather than making this a separate logical node, we can we make the new 
behavior an option in Deduplicate node? That way we don't need to distinguish 
them in the implementation except in couple of places. 



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, as long as event 
times of duplicated rows
+   * are within delay threshold of watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be

Review Comment:
   Also mentioned above: `withWatermark()` is allowed in batch, same way this 
could be allowed to. Essentially this is same as normal dropDuplicates(). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to