This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 56730f6390a1 [SPARK-46731][SS] Manage state store provider instance by 
state data source - reader
56730f6390a1 is described below

commit 56730f6390a19aeada75b866e64115a957212877
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Sat Jan 20 08:12:02 2024 +0900

    [SPARK-46731][SS] Manage state store provider instance by state data source 
- reader
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to change state data source - reader part to manage state 
store provider instance by itself.
    
    ### Why are the changes needed?
    
    Currently, state data source initializes state store instance via 
StateStore.get() which also initializes state store provider instance and 
registers the provider instance to the coordinator. This involves unnecessary 
overheads e.g. maintenance task could be triggered for this provider.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44751 from HeartSaVioR/SPARK-46731.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../v2/state/StatePartitionReader.scala            | 16 +++++++------
 .../StreamStreamJoinStatePartitionReader.scala     |  3 ++-
 .../state/SymmetricHashJoinStateManager.scala      | 28 ++++++++++++++++++----
 3 files changed, 35 insertions(+), 12 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
index ef8d7bf628bf..b79079aca56e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeRow}
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
 import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
-import org.apache.spark.sql.execution.streaming.state.{ReadStateStore, 
StateStore, StateStoreConf, StateStoreId, StateStoreProviderId}
+import org.apache.spark.sql.execution.streaming.state.{ReadStateStore, 
StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
@@ -53,15 +53,13 @@ class StatePartitionReader(
   private val keySchema = SchemaUtil.getSchemaAsDataType(schema, 
"key").asInstanceOf[StructType]
   private val valueSchema = SchemaUtil.getSchemaAsDataType(schema, 
"value").asInstanceOf[StructType]
 
-  private lazy val store: ReadStateStore = {
+  private lazy val provider: StateStoreProvider = {
     val stateStoreId = 
StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
       partition.sourceOptions.operatorId, partition.partition, 
partition.sourceOptions.storeName)
     val stateStoreProviderId = StateStoreProviderId(stateStoreId, 
partition.queryId)
-
     val allStateStoreMetadata = new StateMetadataPartitionReader(
       partition.sourceOptions.stateCheckpointLocation.getParent.toString, 
hadoopConf)
       .stateMetadata.toArray
-
     val stateStoreMetadata = allStateStoreMetadata.filter { entry =>
       entry.operatorId == partition.sourceOptions.operatorId &&
         entry.stateStoreName == partition.sourceOptions.storeName
@@ -78,9 +76,12 @@ class StatePartitionReader(
       stateStoreMetadata.head.numColsPrefixKey
     }
 
-    StateStore.getReadOnly(stateStoreProviderId, keySchema, valueSchema,
-      numColsPrefixKey = numColsPrefixKey, version = 
partition.sourceOptions.batchId + 1,
-      storeConf = storeConf, hadoopConf = hadoopConf.value)
+    StateStoreProvider.createAndInit(
+      stateStoreProviderId, keySchema, valueSchema, numColsPrefixKey, 
storeConf, hadoopConf.value)
+  }
+
+  private lazy val store: ReadStateStore = {
+    provider.getReadStore(partition.sourceOptions.batchId + 1)
   }
 
   private lazy val iter: Iterator[InternalRow] = {
@@ -104,6 +105,7 @@ class StatePartitionReader(
   override def close(): Unit = {
     current = null
     store.abort()
+    provider.close()
   }
 
   private def unifyStateRowPair(pair: (UnsafeRow, UnsafeRow)): InternalRow = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
index d0dd6cb7d1b9..e5a5dddefef5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
@@ -115,7 +115,8 @@ class StreamStreamJoinStatePartitionReader(
         hadoopConf = hadoopConf.value,
         partitionId = partition.partition,
         formatVersion,
-        skippedNullValueCount = None
+        skippedNullValueCount = None,
+        useStateStoreCoordinator = false
       )
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index b67c5ffd09a1..58e5301ed559 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -50,6 +50,12 @@ import org.apache.spark.util.NextIterator
  * @param hadoopConf            Hadoop configuration for reading state data 
from storage
  * @param partitionId           A partition ID of source RDD.
  * @param stateFormatVersion    The version of format for state.
+ * @param skippedNullValueCount The instance of SQLMetric tracking the number 
of skipped null
+ *                              values.
+ * @param useStateStoreCoordinator  Whether to use a state store coordinator 
to maintain the state
+ *                                  store providers being used in this class. 
If true, Spark will
+ *                                  take care of management for state store 
providers, e.g. running
+ *                                  maintenance task for these providers.
  *
  * Internally, the key -> multiple values is stored in two [[StateStore]]s.
  * - Store 1 ([[KeyToNumValuesStore]]) maintains mapping between key -> number 
of values
@@ -79,7 +85,8 @@ class SymmetricHashJoinStateManager(
     hadoopConf: Configuration,
     partitionId: Int,
     stateFormatVersion: Int,
-    skippedNullValueCount: Option[SQLMetric] = None) extends Logging {
+    skippedNullValueCount: Option[SQLMetric] = None,
+    useStateStoreCoordinator: Boolean = true) extends Logging {
   import SymmetricHashJoinStateManager._
 
   /*
@@ -443,6 +450,7 @@ class SymmetricHashJoinStateManager(
 
   /** Helper trait for invoking common functionalities of a state store. */
   private abstract class StateStoreHandler(stateStoreType: StateStoreType) 
extends Logging {
+    private var stateStoreProvider: StateStoreProvider = _
 
     /** StateStore that the subclasses of this class is going to operate on */
     protected def stateStore: StateStore
@@ -457,6 +465,11 @@ class SymmetricHashJoinStateManager(
         logInfo(s"Aborted store ${stateStore.id}")
         stateStore.abort()
       }
+      // If this class manages a state store provider by itself, it should 
take care of closing
+      // provider instance as well.
+      if (stateStoreProvider != null) {
+        stateStoreProvider.close()
+      }
     }
 
     def metrics: StateStoreMetrics = stateStore.metrics
@@ -465,9 +478,16 @@ class SymmetricHashJoinStateManager(
     protected def getStateStore(keySchema: StructType, valueSchema: 
StructType): StateStore = {
       val storeProviderId = StateStoreProviderId(
         stateInfo.get, partitionId, getStateStoreName(joinSide, 
stateStoreType))
-      val store = StateStore.get(
-        storeProviderId, keySchema, valueSchema, numColsPrefixKey = 0,
-        stateInfo.get.storeVersion, storeConf, hadoopConf)
+      val store = if (useStateStoreCoordinator) {
+        StateStore.get(
+          storeProviderId, keySchema, valueSchema, numColsPrefixKey = 0,
+          stateInfo.get.storeVersion, storeConf, hadoopConf)
+      } else {
+        // This class will manage the state store provider by itself.
+        stateStoreProvider = StateStoreProvider.createAndInit(
+          storeProviderId, keySchema, valueSchema, numColsPrefixKey = 0, 
storeConf, hadoopConf)
+        stateStoreProvider.getStore(stateInfo.get.storeVersion)
+      }
       logInfo(s"Loaded store ${store.id}")
       store
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to