HeartSaVioR commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1151453220


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +980,117 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
     
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+    keyExpressions: Seq[Attribute],
+    child: SparkPlan,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    eventTimeWatermarkForLateEvents: Option[Long] = None,
+    eventTimeWatermarkForEviction: Option[Long] = None)
+  extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+  /** Distribute by grouping attributes */
+  override def requiredChildDistribution: Seq[Distribution] = {
+    StatefulOperatorPartitioning.getCompatibleDistribution(
+      keyExpressions, getStateInfo, conf) :: Nil
+  }
+
+  private val schemaForTimeoutRow: StructType = StructType(
+    Array(StructField("expiresAt", LongType, nullable = false)))
+  private val eventTimeCol: Attribute = 
WatermarkSupport.findEventTimeColumn(child.output,
+    allowMultipleEventTimeColumns = false).get
+  private val delayThresholdMillis = 
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    metrics // force lazy init at driver
+
+    child.execute().mapPartitionsWithStateStore(
+      getStateInfo,
+      keyExpressions.toStructType,
+      schemaForTimeoutRow,
+      numColsPrefixKey = 0,
+      session.sessionState,
+      Some(session.streams.stateStoreCoordinator)) { (store, iter) =>
+      val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
+
+      val timeoutToUnsafeRow = UnsafeProjection.create(schemaForTimeoutRow)
+      val timeoutRow = timeoutToUnsafeRow(new 
SpecificInternalRow(schemaForTimeoutRow))
+
+      val numOutputRows = longMetric("numOutputRows")
+      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+      val numRemovedStateRows = longMetric("numRemovedStateRows")
+      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
+      val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
+      val commitTimeMs = longMetric("commitTimeMs")
+      val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows")
+
+      val baseIterator = watermarkPredicateForDataForLateEvents match {
+        case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, 
predicate)
+        case None => iter
+      }
+
+      val updatesStartTimeNs = System.nanoTime
+
+      val result = baseIterator.filter { r =>
+        val row = r.asInstanceOf[UnsafeRow]
+        val key = getKey(row)
+        val value = store.get(key)
+        if (value == null) {
+          val timestamp = row.getLong(eventTimeColOrdinal)
+          // The unit of timestamp in Spark is microseconds, convert the delay 
threshold.
+          val expiresAt = timestamp + delayThresholdMillis * 1000
+
+          timeoutRow.setLong(0, expiresAt)
+          store.put(key, timeoutRow)
+
+          numUpdatedStateRows += 1
+          numOutputRows += 1
+          true
+        } else {
+          // Drop duplicated rows
+          numDroppedDuplicateRows += 1
+          false
+        }
+      }
+
+      CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
+        allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - 
updatesStartTimeNs)
+        allRemovalsTimeMs += timeTakenMs {
+          // Convert watermark value to microsecond
+          val watermarkForEviction = eventTimeWatermarkForEviction.get * 1000
+          store.iterator().foreach { rowPair =>

Review Comment:
   Yeah we leverage range scan in the native support of session window with 
RocksDB state store provider. Maybe it would enable us to apply further 
optimization, although we should still be very careful about changing the state 
format.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to