HeartSaVioR commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1151453220
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -980,3 +980,117 @@ object StreamingDeduplicateExec { private val EMPTY_ROW = UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) } + +case class StreamingDeduplicateWithinWatermarkExec( + keyExpressions: Seq[Attribute], + child: SparkPlan, + stateInfo: Option[StatefulOperatorStateInfo] = None, + eventTimeWatermarkForLateEvents: Option[Long] = None, + eventTimeWatermarkForEviction: Option[Long] = None) + extends UnaryExecNode with StateStoreWriter with WatermarkSupport { + + /** Distribute by grouping attributes */ + override def requiredChildDistribution: Seq[Distribution] = { + StatefulOperatorPartitioning.getCompatibleDistribution( + keyExpressions, getStateInfo, conf) :: Nil + } + + private val schemaForTimeoutRow: StructType = StructType( + Array(StructField("expiresAt", LongType, nullable = false))) + private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output, + allowMultipleEventTimeColumns = false).get + private val delayThresholdMillis = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey) + private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol) + + override protected def doExecute(): RDD[InternalRow] = { + metrics // force lazy init at driver + + child.execute().mapPartitionsWithStateStore( + getStateInfo, + keyExpressions.toStructType, + schemaForTimeoutRow, + numColsPrefixKey = 0, + session.sessionState, + Some(session.streams.stateStoreCoordinator)) { (store, iter) => + val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) + + val timeoutToUnsafeRow = UnsafeProjection.create(schemaForTimeoutRow) + val timeoutRow = timeoutToUnsafeRow(new SpecificInternalRow(schemaForTimeoutRow)) + + val numOutputRows = longMetric("numOutputRows") + val numUpdatedStateRows = longMetric("numUpdatedStateRows") + val numRemovedStateRows = longMetric("numRemovedStateRows") + val allUpdatesTimeMs = longMetric("allUpdatesTimeMs") + val allRemovalsTimeMs = longMetric("allRemovalsTimeMs") + val commitTimeMs = longMetric("commitTimeMs") + val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows") + + val baseIterator = watermarkPredicateForDataForLateEvents match { + case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate) + case None => iter + } + + val updatesStartTimeNs = System.nanoTime + + val result = baseIterator.filter { r => + val row = r.asInstanceOf[UnsafeRow] + val key = getKey(row) + val value = store.get(key) + if (value == null) { + val timestamp = row.getLong(eventTimeColOrdinal) + // The unit of timestamp in Spark is microseconds, convert the delay threshold. + val expiresAt = timestamp + delayThresholdMillis * 1000 + + timeoutRow.setLong(0, expiresAt) + store.put(key, timeoutRow) + + numUpdatedStateRows += 1 + numOutputRows += 1 + true + } else { + // Drop duplicated rows + numDroppedDuplicateRows += 1 + false + } + } + + CompletionIterator[InternalRow, Iterator[InternalRow]](result, { + allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - updatesStartTimeNs) + allRemovalsTimeMs += timeTakenMs { + // Convert watermark value to microsecond + val watermarkForEviction = eventTimeWatermarkForEviction.get * 1000 + store.iterator().foreach { rowPair => Review Comment: Yeah we leverage range scan in the native support of session window with RocksDB state store provider. Maybe it would enable us to apply further optimization, although we should still be very careful about changing the state format. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org