This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 25d96f7bacb4 [SPARK-46911][SS] Adding deleteIfExists operator to 
StatefulProcessorHandleImpl
25d96f7bacb4 is described below

commit 25d96f7bacb43a7d5a835454ecc075e40d4f3c93
Author: Eric Marnadi <eric.marn...@databricks.com>
AuthorDate: Fri Feb 2 22:32:42 2024 +0900

    [SPARK-46911][SS] Adding deleteIfExists operator to 
StatefulProcessorHandleImpl
    
    ### What changes were proposed in this pull request?
    
    Adding the `deleteIfExists` method to the `StatefulProcessorHandle` in 
order to remove state variables from the State Store. Implemented only for 
RocksDBStateStoreProvider, as we do not currently support multiple column 
families for HDFS.
    
    ### Why are the changes needed?
    
    This functionality is needed to so users can remove state from the state 
store from the StatefulProcessorHandleImpl
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes - this functionality (removing column families) was previously not 
supported from our RocksDB client.
    
    ### How was this patch tested?
    
    Added a unit test that creates two streams with the same checkpoint 
directory. The second stream removes state that was created in the first stream 
upon initialization. We ensure that the state from the previous stream isn't 
kept.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #44903 from ericm-db/deleteIfExists.
    
    Authored-by: Eric Marnadi <eric.marn...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    |  11 +++
 ...r-conditions-unsupported-feature-error-class.md |   4 +
 docs/sql-error-conditions.md                       |   6 ++
 .../sql/streaming/StatefulProcessorHandle.scala    |   6 ++
 .../streaming/StatefulProcessorHandleImpl.scala    |  12 +++
 .../state/HDFSBackedStateStoreProvider.scala       |   6 ++
 .../sql/execution/streaming/state/RocksDB.scala    |  16 ++++
 .../state/RocksDBStateStoreProvider.scala          |   5 +
 .../sql/execution/streaming/state/StateStore.scala |   6 ++
 .../streaming/state/StateStoreErrors.scala         |  22 +++++
 .../streaming/state/MemoryStateStore.scala         |   4 +
 .../sql/streaming/TransformWithStateSuite.scala    | 104 +++++++++++++++++++++
 12 files changed, 202 insertions(+)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index baefb05a7070..136825ab374d 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3241,6 +3241,12 @@
     ],
     "sqlState" : "0A000"
   },
+  "STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY" : {
+    "message" : [
+      "Failed to remove default column family with reserved 
name=<colFamilyName>."
+    ],
+    "sqlState" : "42802"
+  },
   "STATE_STORE_MULTIPLE_VALUES_PER_KEY" : {
     "message" : [
       "Store does not support multiple values per key"
@@ -3950,6 +3956,11 @@
           "Creating multiple column families with <stateStoreProvider> is not 
supported."
         ]
       },
+      "STATE_STORE_REMOVING_COLUMN_FAMILIES" : {
+        "message" : [
+          "Removing column families with <stateStoreProvider> is not 
supported."
+        ]
+      },
       "TABLE_OPERATION" : {
         "message" : [
           "Table <tableName> does not support <operation>. Please check the 
current catalog and namespace to make sure the qualified table name is 
expected, and also check the catalog implementation which is configured by 
\"spark.sql.catalog\"."
diff --git a/docs/sql-error-conditions-unsupported-feature-error-class.md 
b/docs/sql-error-conditions-unsupported-feature-error-class.md
index 1b12c4bfc1b3..8d42ecdce790 100644
--- a/docs/sql-error-conditions-unsupported-feature-error-class.md
+++ b/docs/sql-error-conditions-unsupported-feature-error-class.md
@@ -194,6 +194,10 @@ set PROPERTIES and DBPROPERTIES at the same time.
 
 Creating multiple column families with `<stateStoreProvider>` is not supported.
 
+## STATE_STORE_REMOVING_COLUMN_FAMILIES
+
+Removing column families with `<stateStoreProvider>` is not supported.
+
 ## TABLE_OPERATION
 
 Table `<tableName>` does not support `<operation>`. Please check the current 
catalog and namespace to make sure the qualified table name is expected, and 
also check the catalog implementation which is configured by 
"spark.sql.catalog".
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 3a2c4d261352..c704b1c10c46 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -2025,6 +2025,12 @@ The SQL config `<sqlConf>` cannot be found. Please 
verify that the config exists
 
 Star (*) is not allowed in a select list when GROUP BY an ordinal position is 
used.
 
+### STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY
+
+[SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Failed to remove default column family with reserved name=`<colFamilyName>`.
+
 ### STATE_STORE_MULTIPLE_VALUES_PER_KEY
 
 [SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
index 5eaccceb947c..738928b5cc36 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
@@ -40,4 +40,10 @@ private[sql] trait StatefulProcessorHandle extends 
Serializable {
 
   /** Function to return queryInfo for currently running task */
   def getQueryInfo(): QueryInfo
+
+  /**
+   * Function to delete and purge state variable if defined previously
+   * @param stateName - name of the state variable
+   */
+  def deleteIfExists(stateName: String): Unit
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
index d0cd8f7dc0a3..d06938ffeafb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
@@ -122,4 +122,16 @@ class StatefulProcessorHandleImpl(
   }
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
+
+  /**
+   * Function to delete and purge state variable if defined previously
+   *
+   * @param stateName - name of the state variable
+   */
+  override def deleteIfExists(stateName: String): Unit = {
+    verify(currState == CREATED, s"Cannot delete state variable with 
name=$stateName after " +
+      "initialization is complete")
+    store.removeColFamilyIfExists(stateName)
+  }
+
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 842c4004820c..ffb618d0fbb0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -201,6 +201,12 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     override def toString(): String = {
       
s"HDFSStateStore[id=(op=${id.operatorId},part=${id.partitionId}),dir=$baseDir]"
     }
+
+    override def removeColFamilyIfExists(colFamilyName: String): Unit = {
+      throw StateStoreErrors.removingColumnFamiliesNotSupported(
+        "HDFSBackedStateStoreProvider")
+
+    }
   }
 
   def getMetricsForProvider(): Map[String, Long] = synchronized {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index cf453394ba47..bf1a1c50d350 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -264,6 +264,22 @@ class RocksDB(
     }
   }
 
+  /**
+   * Remove RocksDB column family, if exists
+   */
+  def removeColFamilyIfExists(colFamilyName: String): Unit = {
+    if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
+      throw StateStoreErrors.cannotRemoveDefaultColumnFamily(colFamilyName)
+    }
+
+    if (checkColFamilyExists(colFamilyName)) {
+      assert(db != null)
+      val handle = colFamilyNameToHandleMap(colFamilyName)
+      db.dropColumnFamily(handle)
+      colFamilyNameToHandleMap.remove(colFamilyName)
+    }
+  }
+
   /**
    * Get the value for the given key if present, or null.
    * @note This will return the last written value even if it was uncommitted.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index d38e21aac181..e469fd4fe1c2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -189,6 +189,11 @@ private[sql] class RocksDBStateStoreProvider
 
     /** Return the [[RocksDB]] instance in this store. This is exposed mainly 
for testing. */
     def dbInstance(): RocksDB = rocksDB
+
+    /** Remove column family if exists */
+     override def removeColFamilyIfExists(colFamilyName: String): Unit = {
+       rocksDB.removeColFamilyIfExists(colFamilyName)
+    }
   }
 
   override def init(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index a8af14e82230..4b409b8a66b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -103,6 +103,12 @@ trait ReadStateStore {
  * double resource cleanup.
  */
 trait StateStore extends ReadStateStore {
+
+  /**
+   * Remove column family with given name, if present.
+   */
+  def removeColFamilyIfExists(colFamilyName: String): Unit
+
   /**
    * Create column family with given name, if absent.
    */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 665dafc6f66a..bbc6d4c78f90 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -37,6 +37,16 @@ object StateStoreErrors {
       new 
StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider)
     }
 
+  def removingColumnFamiliesNotSupported(stateStoreProvider: String):
+    StateStoreRemovingColumnFamiliesNotSupportedException = {
+        new 
StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider)
+    }
+
+  def cannotRemoveDefaultColumnFamily(colFamilyName: String):
+    StateStoreCannotRemoveDefaultColumnFamily = {
+        new StateStoreCannotRemoveDefaultColumnFamily(colFamilyName)
+    }
+
   def unsupportedOperationException(operationName: String, entity: String):
     StateStoreUnsupportedOperationException = {
       new StateStoreUnsupportedOperationException(operationName, entity)
@@ -48,6 +58,18 @@ class 
StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider:
     errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
     messageParameters = Map("stateStoreProvider" -> stateStoreProvider)
   )
+class 
StateStoreRemovingColumnFamiliesNotSupportedException(stateStoreProvider: 
String)
+  extends SparkUnsupportedOperationException(
+    errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_REMOVING_COLUMN_FAMILIES",
+    messageParameters = Map("stateStoreProvider" -> stateStoreProvider)
+  )
+
+class StateStoreCannotRemoveDefaultColumnFamily(colFamilyName: String)
+  extends SparkUnsupportedOperationException(
+    errorClass = "STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY",
+    messageParameters = Map("colFamilyName" -> colFamilyName)
+  )
+
 
 class StateStoreUnsupportedOperationException(operationType: String, entity: 
String)
   extends SparkUnsupportedOperationException(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
index 5229865122be..02052d307f41 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
@@ -33,6 +33,10 @@ class MemoryStateStore extends StateStore() {
     throw 
StateStoreErrors.multipleColumnFamiliesNotSupported("MemoryStateStoreProvider")
   }
 
+  override def removeColFamilyIfExists(colFamilyName: String): Unit = {
+    throw 
StateStoreErrors.removingColumnFamiliesNotSupported("MemoryStateStoreProvider")
+  }
+
   override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = 
map.get(key)
 
   override def put(key: UnsafeRow, newValue: UnsafeRow, colFamilyName: 
String): Unit =
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index 70a71f745066..7a6c3f00fc7a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -58,6 +58,73 @@ class RunningCountStatefulProcessor extends 
StatefulProcessor[String, String, (S
   override def close(): Unit = {}
 }
 
+class RunningCountMostRecentStatefulProcessor
+  extends StatefulProcessor[String, (String, String), (String, String, String)]
+  with Logging {
+  @transient private var _countState: ValueState[Long] = _
+  @transient private var _mostRecent: ValueState[String] = _
+  @transient var _processorHandle: StatefulProcessorHandle = _
+
+  override def init(
+      handle: StatefulProcessorHandle,
+      outputMode: OutputMode) : Unit = {
+    _processorHandle = handle
+    assert(handle.getQueryInfo().getBatchId >= 0)
+    _countState = _processorHandle.getValueState[Long]("countState")
+    _mostRecent = _processorHandle.getValueState[String]("mostRecent")
+  }
+
+  override def handleInputRows(
+      key: String,
+      inputRows: Iterator[(String, String)],
+      timerValues: TimerValues): Iterator[(String, String, String)] = {
+    val count = _countState.getOption().getOrElse(0L) + 1
+    val mostRecent = _mostRecent.getOption().getOrElse("")
+
+    var output = List[(String, String, String)]()
+    inputRows.foreach { row =>
+      _mostRecent.update(row._2)
+      _countState.update(count)
+      output = (key, count.toString, mostRecent) :: output
+    }
+    output.iterator
+  }
+
+  override def close(): Unit = {}
+}
+
+class MostRecentStatefulProcessorWithDeletion
+  extends StatefulProcessor[String, (String, String), (String, String)]
+  with Logging {
+  @transient private var _mostRecent: ValueState[String] = _
+  @transient var _processorHandle: StatefulProcessorHandle = _
+
+  override def init(
+       handle: StatefulProcessorHandle,
+       outputMode: OutputMode) : Unit = {
+    _processorHandle = handle
+    assert(handle.getQueryInfo().getBatchId >= 0)
+    _processorHandle.deleteIfExists("countState")
+    _mostRecent = _processorHandle.getValueState[String]("mostRecent")
+  }
+
+  override def handleInputRows(
+      key: String,
+      inputRows: Iterator[(String, String)],
+      timerValues: TimerValues): Iterator[(String, String)] = {
+    val mostRecent = _mostRecent.getOption().getOrElse("")
+
+    var output = List[(String, String)]()
+    inputRows.foreach { row =>
+      _mostRecent.update(row._2)
+      output = (key, mostRecent) :: output
+    }
+    output.iterator
+  }
+
+  override def close(): Unit = {}
+}
+
 class RunningCountStatefulProcessorWithError extends 
RunningCountStatefulProcessor {
   @transient private var _tempState: ValueState[Long] = _
 
@@ -129,6 +196,43 @@ class TransformWithStateSuite extends StateStoreMetricsTest
       )
     }
   }
+
+  test("transformWithState - test deleteIfExists operator") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
+      withTempDir { chkptDir =>
+        val dirPath = chkptDir.getCanonicalPath
+        val inputData = MemoryStream[(String, String)]
+        val stream1 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new RunningCountMostRecentStatefulProcessor(),
+            TimeoutMode.NoTimeouts(),
+            OutputMode.Update())
+
+        val stream2 = inputData.toDS()
+          .groupByKey(x => x._1)
+          .transformWithState(new MostRecentStatefulProcessorWithDeletion(),
+            TimeoutMode.NoTimeouts(),
+            OutputMode.Update())
+
+        testStream(stream1, OutputMode.Update())(
+          StartStream(checkpointLocation = dirPath),
+          AddData(inputData, ("a", "str1")),
+          CheckNewAnswer(("a", "1", "")),
+          StopStream
+        )
+        testStream(stream2, OutputMode.Update())(
+          StartStream(checkpointLocation = dirPath),
+          AddData(inputData, ("a", "str2"), ("b", "str3")),
+          CheckNewAnswer(("a", "str1"),
+            ("b", "")), // should not factor in previous count state
+          StopStream
+        )
+      }
+    }
+  }
 }
 
 class TransformWithStateValidationSuite extends StateStoreMetricsTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to