This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 74a9c6cfc6ea [SPARK-45511][SS] State Data Source - Reader
74a9c6cfc6ea is described below

commit 74a9c6cfc6ea937031fe1ca5db539139322339a5
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Wed Nov 15 10:45:14 2023 +0900

    [SPARK-45511][SS] State Data Source - Reader
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to introduce a baseline implementation of state processor 
- reader.
    
    State processor is a new data source which enables reading and writing the 
state in the existing checkpoint with the batch query. Since we implement the 
feature as data source, we are leveraging the UX for DataFrame API which most 
users are already familiar with.
    
    Functionalities of the baseline implementation are following:
    
    * Specify a state store instance via store name (default: DEFAULT)
    * Specify a stateful operator via operator ID (default: 0)
    * Specify a batch ID (default: last committed)
    * Specify the source option joinSide to construct input rows in the state 
store for stream-stream join
      * It is still possible that users can read a specific state store 
instance from 4 instances in stream-stream join, which would be used mostly for 
debugging Spark itself
      * When this is enabled, the data source hides the internal column from 
the output.
    * Specify a metadata column (_partition_id)so that users can indicate the 
partition ID for the state row.
    
    ### Why are the changes needed?
    
    Please refer to the SPIP doc for rationale: 
https://docs.google.com/document/d/1_iVf_CIu2RZd3yWWF6KoRNlBiz5NbSIK0yThqG0EvPY/edit?usp=sharing
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, we are adding a new data source.
    
    ### How was this patch tested?
    
    New test suite.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43425 from HeartSaVioR/SPARK-45511.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../CheckConnectJvmClientCompatibility.scala       |   2 +
 ...org.apache.spark.sql.sources.DataSourceRegister |   3 +-
 .../scala/org/apache/spark/sql/RuntimeConfig.scala |   2 +-
 .../datasources/v2/state/StateDataSource.scala     | 212 ++++++
 .../v2/state/StatePartitionReader.scala            | 108 +++
 .../datasources/v2/state/StateScanBuilder.scala    | 130 ++++
 .../datasources/v2/state/StateTable.scala          | 104 +++
 .../v2/state/StreamStreamJoinStateHelper.scala     |  88 +++
 .../StreamStreamJoinStatePartitionReader.scala     | 177 +++++
 .../datasources/v2/state/utils/SchemaUtil.scala    |  29 +
 .../streaming/StreamingCheckpointConstants.scala   |  24 +
 .../state/StateSchemaCompatibilityChecker.scala    |   5 +-
 .../state/SymmetricHashJoinStateManager.scala      |  53 ++
 .../v2/state/StateDataSourceReadSuite.scala        | 779 +++++++++++++++++++++
 .../v2/state/StateDataSourceTestBase.scala         | 410 +++++++++++
 15 files changed, 2121 insertions(+), 5 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index fb4f80998fcf..5178013e455b 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -248,6 +248,8 @@ object CheckConnectJvmClientCompatibility {
 
       // RuntimeConfig
       
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RuntimeConfig$"),
+      ProblemFilters.exclude[DirectMissingMethodProblem](
+        "org.apache.spark.sql.RuntimeConfig.sqlConf"),
 
       // DataStreamWriter
       ProblemFilters.exclude[MissingClassProblem](
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index b4c18c38f04a..9e32329beb9d 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -28,4 +28,5 @@ 
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
 org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
 org.apache.spark.sql.execution.datasources.binaryfile.BinaryFileFormat
 org.apache.spark.sql.execution.streaming.sources.RatePerMicroBatchProvider
-org.apache.spark.sql.execution.datasources.v2.state.StateMetadataSource
\ No newline at end of file
+org.apache.spark.sql.execution.datasources.v2.state.StateMetadataSource
+org.apache.spark.sql.execution.datasources.v2.state.StateDataSource
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
index f879a13097bb..936d40f5c387 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.internal.SQLConf
  * @since 2.0.0
  */
 @Stable
-class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
+class RuntimeConfig private[sql](val sqlConf: SQLConf = new SQLConf) {
 
   /**
    * Sets the given Spark runtime configuration property.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
new file mode 100644
index 000000000000..55173a7e8878
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util
+import java.util.UUID
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.catalyst.DataSourceOptions
+import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
+import org.apache.spark.sql.connector.expressions.Transform
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues.JoinSideValues
+import org.apache.spark.sql.execution.streaming.{CommitLog, OffsetSeqLog, 
OffsetSeqMetadata}
+import 
org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DIR_NAME_COMMITS,
 DIR_NAME_OFFSETS, DIR_NAME_STATE}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide,
 RightSide}
+import 
org.apache.spark.sql.execution.streaming.state.{StateSchemaCompatibilityChecker,
 StateStore, StateStoreConf, StateStoreId, StateStoreProviderId}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/**
+ * An implementation of [[TableProvider]] with [[DataSourceRegister]] for 
State Store data source.
+ */
+class StateDataSource extends TableProvider with DataSourceRegister {
+  private lazy val session: SparkSession = SparkSession.active
+
+  private lazy val hadoopConf: Configuration = 
session.sessionState.newHadoopConf()
+
+  override def shortName(): String = "statestore"
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: util.Map[String, String]): Table = {
+    val sourceOptions = StateSourceOptions.apply(session, hadoopConf, 
properties)
+    val stateConf = buildStateStoreConf(sourceOptions.resolvedCpLocation, 
sourceOptions.batchId)
+    new StateTable(session, schema, sourceOptions, stateConf)
+  }
+
+  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
+    val partitionId = StateStore.PARTITION_ID_TO_CHECK_SCHEMA
+    val sourceOptions = StateSourceOptions.apply(session, hadoopConf, options)
+
+    val stateCheckpointLocation = sourceOptions.stateCheckpointLocation
+    try {
+      val (keySchema, valueSchema) = sourceOptions.joinSide match {
+        case JoinSideValues.left =>
+          StreamStreamJoinStateHelper.readKeyValueSchema(session, 
stateCheckpointLocation.toString,
+            sourceOptions.operatorId, LeftSide)
+
+        case JoinSideValues.right =>
+          StreamStreamJoinStateHelper.readKeyValueSchema(session, 
stateCheckpointLocation.toString,
+            sourceOptions.operatorId, RightSide)
+
+        case JoinSideValues.none =>
+          val storeId = new StateStoreId(stateCheckpointLocation.toString, 
sourceOptions.operatorId,
+            partitionId, sourceOptions.storeName)
+          val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
+          val manager = new StateSchemaCompatibilityChecker(providerId, 
hadoopConf)
+          manager.readSchemaFile()
+      }
+
+      new StructType()
+        .add("key", keySchema)
+        .add("value", valueSchema)
+    } catch {
+      case NonFatal(e) =>
+        throw new IllegalArgumentException("Failed to read the state schema. 
Either the file " +
+          s"does not exist, or the file is corrupted. options: 
$sourceOptions", e)
+    }
+  }
+
+  private def buildStateStoreConf(checkpointLocation: String, batchId: Long): 
StateStoreConf = {
+    val offsetLog = new OffsetSeqLog(session,
+      new Path(checkpointLocation, DIR_NAME_OFFSETS).toString)
+    offsetLog.get(batchId) match {
+      case Some(value) =>
+        val metadata = value.metadata.getOrElse(
+          throw new IllegalStateException(s"Metadata is not available for 
offset log for " +
+            s"$batchId, checkpoint location $checkpointLocation")
+        )
+
+        val clonedRuntimeConf = new 
RuntimeConfig(session.sessionState.conf.clone())
+        OffsetSeqMetadata.setSessionConf(metadata, clonedRuntimeConf)
+        StateStoreConf(clonedRuntimeConf.sqlConf)
+
+      case _ =>
+        throw new IllegalStateException(s"The offset log for $batchId does not 
exist, " +
+          s"checkpoint location $checkpointLocation")
+    }
+  }
+
+  override def supportsExternalMetadata(): Boolean = false
+}
+
+case class StateSourceOptions(
+    resolvedCpLocation: String,
+    batchId: Long,
+    operatorId: Int,
+    storeName: String,
+    joinSide: JoinSideValues) {
+  def stateCheckpointLocation: Path = new Path(resolvedCpLocation, 
DIR_NAME_STATE)
+}
+
+object StateSourceOptions extends DataSourceOptions {
+  val PATH = newOption("path")
+  val BATCH_ID = newOption("batchId")
+  val OPERATOR_ID = newOption("operatorId")
+  val STORE_NAME = newOption("storeName")
+  val JOIN_SIDE = newOption("joinSide")
+
+  object JoinSideValues extends Enumeration {
+    type JoinSideValues = Value
+    val left, right, none = Value
+  }
+
+  def apply(
+      sparkSession: SparkSession,
+      hadoopConf: Configuration,
+      properties: util.Map[String, String]): StateSourceOptions = {
+    apply(sparkSession, hadoopConf, new CaseInsensitiveStringMap(properties))
+  }
+
+  def apply(
+      sparkSession: SparkSession,
+      hadoopConf: Configuration,
+      options: CaseInsensitiveStringMap): StateSourceOptions = {
+    val checkpointLocation = Option(options.get(PATH)).orElse {
+      throw new IllegalArgumentException(s"'$PATH' must be specified.")
+    }.get
+
+    val resolvedCpLocation = resolvedCheckpointLocation(hadoopConf, 
checkpointLocation)
+
+    val batchId = Option(options.get(BATCH_ID)).map(_.toLong).orElse {
+      Some(getLastCommittedBatch(sparkSession, resolvedCpLocation))
+    }.get
+
+    if (batchId < 0) {
+      throw new IllegalArgumentException(s"'$BATCH_ID' cannot be negative.")
+    }
+
+    val operatorId = Option(options.get(OPERATOR_ID)).map(_.toInt)
+      .orElse(Some(0)).get
+
+    if (operatorId < 0) {
+      throw new IllegalArgumentException(s"'$OPERATOR_ID' cannot be negative.")
+    }
+
+    val storeName = Option(options.get(STORE_NAME))
+      .map(_.trim)
+      .getOrElse(StateStoreId.DEFAULT_STORE_NAME)
+
+    if (storeName.isEmpty) {
+      throw new IllegalArgumentException(s"'$STORE_NAME' cannot be an empty 
string.")
+    }
+
+    val joinSide = try {
+      Option(options.get(JOIN_SIDE))
+        .map(JoinSideValues.withName).getOrElse(JoinSideValues.none)
+    } catch {
+      case _: NoSuchElementException =>
+        // convert to IllegalArgumentException
+        throw new IllegalArgumentException(s"Incorrect value of the option " +
+          s"'$JOIN_SIDE'. Valid values are 
${JoinSideValues.values.mkString(",")}")
+    }
+
+    if (joinSide != JoinSideValues.none && storeName != 
StateStoreId.DEFAULT_STORE_NAME) {
+      throw new IllegalArgumentException(s"The options '$JOIN_SIDE' and " +
+        s"'$STORE_NAME' cannot be specified together. Please specify either 
one.")
+    }
+
+    StateSourceOptions(resolvedCpLocation, batchId, operatorId, storeName, 
joinSide)
+  }
+
+  private def resolvedCheckpointLocation(
+      hadoopConf: Configuration,
+      checkpointLocation: String): String = {
+    val checkpointPath = new Path(checkpointLocation)
+    val fs = checkpointPath.getFileSystem(hadoopConf)
+    checkpointPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory).toUri.toString
+  }
+
+  private def getLastCommittedBatch(session: SparkSession, checkpointLocation: 
String): Long = {
+    val commitLog = new CommitLog(session,
+      new Path(checkpointLocation, DIR_NAME_COMMITS).toString)
+    commitLog.getLatest() match {
+      case Some((lastId, _)) => lastId
+      case None => throw new IllegalStateException("No committed batch found, 
" +
+        s"checkpoint location: $checkpointLocation")
+    }
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
new file mode 100644
index 000000000000..824034f42eaf
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
JoinedRow, UnsafeRow}
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
+import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
+import org.apache.spark.sql.execution.streaming.state.{ReadStateStore, 
StateStore, StateStoreConf, StateStoreId, StateStoreProviderId}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * An implementation of [[PartitionReaderFactory]] for State data source. This 
is used to support
+ * general read from a state store instance, rather than specific to the 
operator.
+ */
+class StatePartitionReaderFactory(
+    storeConf: StateStoreConf,
+    hadoopConf: SerializableConfiguration,
+    schema: StructType) extends PartitionReaderFactory {
+
+  override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
+    new StatePartitionReader(storeConf, hadoopConf,
+      partition.asInstanceOf[StateStoreInputPartition], schema)
+  }
+}
+
+/**
+ * An implementation of [[PartitionReader]] for State data source. This is 
used to support
+ * general read from a state store instance, rather than specific to the 
operator.
+ */
+class StatePartitionReader(
+    storeConf: StateStoreConf,
+    hadoopConf: SerializableConfiguration,
+    partition: StateStoreInputPartition,
+    schema: StructType) extends PartitionReader[InternalRow] {
+
+  private val keySchema = SchemaUtil.getSchemaAsDataType(schema, 
"key").asInstanceOf[StructType]
+  private val valueSchema = SchemaUtil.getSchemaAsDataType(schema, 
"value").asInstanceOf[StructType]
+
+  private lazy val store: ReadStateStore = {
+    val stateStoreId = 
StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString,
+      partition.sourceOptions.operatorId, partition.partition, 
partition.sourceOptions.storeName)
+    val stateStoreProviderId = StateStoreProviderId(stateStoreId, 
partition.queryId)
+
+    // TODO: This does not handle the case of session window aggregation; we 
don't have an
+    //  information whether the state store uses prefix scan or not. We will 
have to add such
+    //  information to determine the right encoder/decoder for the data.
+    StateStore.getReadOnly(stateStoreProviderId, keySchema, valueSchema,
+      numColsPrefixKey = 0, version = partition.sourceOptions.batchId + 1, 
storeConf = storeConf,
+      hadoopConf = hadoopConf.value)
+  }
+
+  private lazy val iter: Iterator[InternalRow] = {
+    store.iterator().map(pair => unifyStateRowPair((pair.key, pair.value)))
+  }
+
+  private var current: InternalRow = _
+
+  override def next(): Boolean = {
+    if (iter.hasNext) {
+      current = iter.next()
+      true
+    } else {
+      current = null
+      false
+    }
+  }
+
+  private val joinedRow = new JoinedRow()
+
+  private def addMetadata(row: InternalRow): InternalRow = {
+    val metadataRow = new GenericInternalRow(
+      StateTable.METADATA_COLUMNS.map(_.name()).map {
+        case "_partition_id" => partition.partition.asInstanceOf[Any]
+      }.toArray
+    )
+    joinedRow.withLeft(row).withRight(metadataRow)
+  }
+
+  override def get(): InternalRow = addMetadata(current)
+
+  override def close(): Unit = {
+    current = null
+    store.abort()
+  }
+
+  private def unifyStateRowPair(pair: (UnsafeRow, UnsafeRow)): InternalRow = {
+    val row = new GenericInternalRow(2)
+    row.update(0, pair._1)
+    row.update(1, pair._2)
+    row
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala
new file mode 100644
index 000000000000..214f5f973303
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util.UUID
+
+import scala.util.Try
+
+import org.apache.hadoop.fs.{Path, PathFilter}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReaderFactory, Scan, ScanBuilder}
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide,
 RightSide}
+import org.apache.spark.sql.execution.streaming.state.StateStoreConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+/** An implementation of [[ScanBuilder]] for State Store data source. */
+class StateScanBuilder(
+    session: SparkSession,
+    schema: StructType,
+    sourceOptions: StateSourceOptions,
+    stateStoreConf: StateStoreConf) extends ScanBuilder {
+  override def build(): Scan = new StateScan(session, schema, sourceOptions, 
stateStoreConf)
+}
+
+/** An implementation of [[InputPartition]] for State Store data source. */
+class StateStoreInputPartition(
+    val partition: Int,
+    val queryId: UUID,
+    val sourceOptions: StateSourceOptions) extends InputPartition
+
+/** An implementation of [[Scan]] with [[Batch]] for State Store data source. 
*/
+class StateScan(
+    session: SparkSession,
+    schema: StructType,
+    sourceOptions: StateSourceOptions,
+    stateStoreConf: StateStoreConf) extends Scan with Batch {
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val hadoopConfBroadcast = session.sparkContext.broadcast(
+    new SerializableConfiguration(session.sessionState.newHadoopConf()))
+
+  override def readSchema(): StructType = schema
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    val fs = 
stateCheckpointPartitionsLocation.getFileSystem(hadoopConfBroadcast.value.value)
+    val partitions = fs.listStatus(stateCheckpointPartitionsLocation, new 
PathFilter() {
+      override def accept(path: Path): Boolean = {
+        fs.isDirectory(path) && Try(path.getName.toInt).isSuccess && 
path.getName.toInt >= 0
+      }
+    })
+
+    if (partitions.headOption.isEmpty) {
+      throw new IllegalArgumentException("The state does not have any 
partition. Please double " +
+        s"check that the query points to the valid state. options: 
$sourceOptions")
+    } else {
+      // just a dummy query id because we are actually not running streaming 
query
+      val queryId = UUID.randomUUID()
+
+      val partitionsSorted = partitions.sortBy(fs => fs.getPath.getName.toInt)
+      val partitionNums = partitionsSorted.map(_.getPath.getName.toInt)
+      // assuming no same number - they're directories hence no same name
+      val head = partitionNums.head
+      val tail = partitionNums(partitionNums.length - 1)
+      assert(head == 0, "Partition should start with 0")
+      assert((tail - head + 1) == partitionNums.length,
+        s"No continuous partitions in state: 
${partitionNums.mkString("Array(", ", ", ")")}")
+
+      partitionNums.map {
+        pn => new StateStoreInputPartition(pn, queryId, sourceOptions)
+      }.toArray
+    }
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = 
sourceOptions.joinSide match {
+    case JoinSideValues.left =>
+      val userFacingSchema = schema
+      val stateSchema = StreamStreamJoinStateHelper.readSchema(session,
+        sourceOptions.stateCheckpointLocation.toString, 
sourceOptions.operatorId, LeftSide,
+        excludeAuxColumns = false)
+      new StreamStreamJoinStatePartitionReaderFactory(stateStoreConf,
+        hadoopConfBroadcast.value, userFacingSchema, stateSchema)
+
+    case JoinSideValues.right =>
+      val userFacingSchema = schema
+      val stateSchema = StreamStreamJoinStateHelper.readSchema(session,
+        sourceOptions.stateCheckpointLocation.toString, 
sourceOptions.operatorId, RightSide,
+        excludeAuxColumns = false)
+      new StreamStreamJoinStatePartitionReaderFactory(stateStoreConf,
+        hadoopConfBroadcast.value, userFacingSchema, stateSchema)
+
+    case JoinSideValues.none =>
+      new StatePartitionReaderFactory(stateStoreConf, 
hadoopConfBroadcast.value, schema)
+  }
+
+  override def toBatch: Batch = this
+
+  override def description(): String = {
+    val desc = s"StateScan " +
+      s"[stateCkptLocation=${sourceOptions.stateCheckpointLocation}]" +
+      
s"[batchId=${sourceOptions.batchId}][operatorId=${sourceOptions.operatorId}]" +
+      s"[storeName=${sourceOptions.storeName}]"
+
+    if (sourceOptions.joinSide != JoinSideValues.none) {
+      desc + s"[joinSide=${sourceOptions.joinSide}]"
+    } else {
+      desc
+    }
+  }
+
+  private def stateCheckpointPartitionsLocation: Path = {
+    new Path(sourceOptions.stateCheckpointLocation, 
sourceOptions.operatorId.toString)
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala
new file mode 100644
index 000000000000..264ec09c00b2
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, SupportsRead, Table, TableCapability}
+import org.apache.spark.sql.connector.read.ScanBuilder
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues
+import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
+import org.apache.spark.sql.execution.streaming.state.StateStoreConf
+import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/** An implementation of [[Table]] with [[SupportsRead]] for State Store data 
source. */
+class StateTable(
+    session: SparkSession,
+    override val schema: StructType,
+    sourceOptions: StateSourceOptions,
+    stateConf: StateStoreConf)
+  extends Table with SupportsRead with SupportsMetadataColumns {
+
+  import StateTable._
+
+  if (!isValidSchema(schema)) {
+    throw new IllegalStateException(s"Invalid schema is provided. Provided 
schema: $schema for " +
+      s"checkpoint location: ${sourceOptions.stateCheckpointLocation} , 
operatorId: " +
+      s"${sourceOptions.operatorId} , storeName: ${sourceOptions.storeName}, " 
+
+      s"joinSide: ${sourceOptions.joinSide}")
+  }
+
+  override def name(): String = {
+    val desc = s"StateTable " +
+      s"[stateCkptLocation=${sourceOptions.stateCheckpointLocation}]" +
+      
s"[batchId=${sourceOptions.batchId}][operatorId=${sourceOptions.operatorId}]" +
+      s"[storeName=${sourceOptions.storeName}]"
+
+    if (sourceOptions.joinSide != JoinSideValues.none) {
+      desc + s"[joinSide=${sourceOptions.joinSide}]"
+    } else {
+      desc
+    }
+  }
+
+  override def capabilities(): util.Set[TableCapability] = CAPABILITY
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
+    new StateScanBuilder(session, schema, sourceOptions, stateConf)
+
+  override def properties(): util.Map[String, String] = Map.empty[String, 
String].asJava
+
+  private def isValidSchema(schema: StructType): Boolean = {
+    if (schema.fieldNames.toSeq != Seq("key", "value")) {
+      false
+    } else if (!SchemaUtil.getSchemaAsDataType(schema, 
"key").isInstanceOf[StructType]) {
+      false
+    } else if (!SchemaUtil.getSchemaAsDataType(schema, 
"value").isInstanceOf[StructType]) {
+      false
+    } else {
+      true
+    }
+  }
+
+  override def metadataColumns(): Array[MetadataColumn] = 
METADATA_COLUMNS.toArray
+}
+
+/**
+ * Companion object for StateTable class to place constants and nested objects.
+ * Currently storing capability of the table and the definition of metadata 
column(s).
+ */
+object StateTable {
+  private val CAPABILITY = Set(TableCapability.BATCH_READ).asJava
+
+  val METADATA_COLUMNS: Seq[MetadataColumn] = Seq(PartitionId)
+
+  private object PartitionId extends MetadataColumn {
+    override def name(): String = "_partition_id"
+
+    override def dataType(): DataType = IntegerType
+
+    override def isNullable: Boolean = false
+
+    override def comment(): String = {
+      "Represents an ID for a physical state partition this row belongs to."
+    }
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala
new file mode 100644
index 000000000000..7b08c289fcc4
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util.UUID
+
+import org.apache.spark.sql.SparkSession
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinSide
+import 
org.apache.spark.sql.execution.streaming.state.{StateSchemaCompatibilityChecker,
 StateStore, StateStoreId, StateStoreProviderId, SymmetricHashJoinStateManager}
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+/**
+ * A helper object to read the state schema for stream-stream join.
+ *
+ * The parameter `excludeAuxColumns` in methods represents whether the result 
schema should
+ * include the columns the operator added in addition to the input schema.
+ */
+object StreamStreamJoinStateHelper {
+  def readSchema(
+      session: SparkSession,
+      stateCheckpointLocation: String,
+      operatorId: Int,
+      side: JoinSide,
+      excludeAuxColumns: Boolean = true): StructType = {
+    val (keySchema, valueSchema) = readKeyValueSchema(session, 
stateCheckpointLocation,
+      operatorId, side, excludeAuxColumns)
+
+    new StructType()
+      .add("key", keySchema)
+      .add("value", valueSchema)
+  }
+
+  def readKeyValueSchema(
+      session: SparkSession,
+      stateCheckpointLocation: String,
+      operatorId: Int,
+      side: JoinSide,
+      excludeAuxColumns: Boolean = true): (StructType, StructType) = {
+
+    // KeyToNumValuesType, KeyWithIndexToValueType
+    val storeNames = 
SymmetricHashJoinStateManager.allStateStoreNames(side).toList
+
+    val partitionId = StateStore.PARTITION_ID_TO_CHECK_SCHEMA
+    val storeIdForKeyToNumValues = new StateStoreId(stateCheckpointLocation, 
operatorId,
+      partitionId, storeNames(0))
+    val providerIdForKeyToNumValues = new 
StateStoreProviderId(storeIdForKeyToNumValues,
+      UUID.randomUUID())
+
+    val storeIdForKeyWithIndexToValue = new 
StateStoreId(stateCheckpointLocation,
+      operatorId, partitionId, storeNames(1))
+    val providerIdForKeyWithIndexToValue = new 
StateStoreProviderId(storeIdForKeyWithIndexToValue,
+      UUID.randomUUID())
+
+    val newHadoopConf = session.sessionState.newHadoopConf()
+
+    val manager = new 
StateSchemaCompatibilityChecker(providerIdForKeyToNumValues, newHadoopConf)
+    val (keySchema, _) = manager.readSchemaFile()
+
+    val manager2 = new 
StateSchemaCompatibilityChecker(providerIdForKeyWithIndexToValue,
+      newHadoopConf)
+    val (_, valueSchema) = manager2.readSchemaFile()
+
+    val maybeMatchedColumn = valueSchema.last
+
+    if (excludeAuxColumns
+        && maybeMatchedColumn.name == "matched"
+        && maybeMatchedColumn.dataType == BooleanType) {
+      // remove internal column `matched` for format version 2
+      (keySchema, StructType(valueSchema.dropRight(1)))
+    } else {
+      (keySchema, valueSchema)
+    }
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
new file mode 100644
index 000000000000..1a3d42aa0661
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
GenericInternalRow, JoinedRow, Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions.JoinSideValues
+import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
+import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{JoinSide,
 LeftSide, RightSide}
+import org.apache.spark.sql.execution.streaming.state.{StateStoreConf, 
SymmetricHashJoinStateManager}
+import org.apache.spark.sql.types.{BooleanType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * An implementation of [[PartitionReaderFactory]] for State Store data 
source, specifically
+ * to build a [[PartitionReader]] for reading the state from stream-stream 
join.
+ */
+class StreamStreamJoinStatePartitionReaderFactory(
+    storeConf: StateStoreConf,
+    hadoopConf: SerializableConfiguration,
+    userFacingSchema: StructType,
+    stateSchema: StructType) extends PartitionReaderFactory {
+  override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
+    new StreamStreamJoinStatePartitionReader(storeConf, hadoopConf,
+      partition.asInstanceOf[StateStoreInputPartition], userFacingSchema, 
stateSchema)
+  }
+}
+
+/**
+ * An implementation of [[PartitionReader]] for State Store data source, 
specifically to read
+ * the partition for the state from stream-stream join.
+ */
+class StreamStreamJoinStatePartitionReader(
+    storeConf: StateStoreConf,
+    hadoopConf: SerializableConfiguration,
+    partition: StateStoreInputPartition,
+    userFacingSchema: StructType,
+    stateSchema: StructType) extends PartitionReader[InternalRow] with Logging 
{
+
+  private val keySchema = SchemaUtil.getSchemaAsDataType(stateSchema, "key")
+    .asInstanceOf[StructType]
+  private val valueSchema = SchemaUtil.getSchemaAsDataType(stateSchema, 
"value")
+    .asInstanceOf[StructType]
+
+  private val userFacingValueSchema = 
SchemaUtil.getSchemaAsDataType(userFacingSchema, "value")
+    .asInstanceOf[StructType]
+
+  private val joinSide: JoinSide = partition.sourceOptions.joinSide match {
+    case JoinSideValues.left => LeftSide
+    case JoinSideValues.right => RightSide
+    case JoinSideValues.none =>
+      throw new IllegalStateException("Unexpected join side for stream-stream 
read!")
+  }
+
+  /*
+   * This is to handle the difference of schema across state format versions. 
The major difference
+   * is whether we have added new field(s) in addition to the fields from 
input schema.
+   *
+   * - version 1: no additional field
+   * - version 2: the field "matched" is added to the last
+   */
+  private val (inputAttributes, formatVersion) = {
+    val maybeMatchedColumn = valueSchema.last
+    val (fields, version) = {
+      if (maybeMatchedColumn.name == "matched" && maybeMatchedColumn.dataType 
== BooleanType) {
+        (valueSchema.dropRight(1), 2)
+      } else {
+        (valueSchema, 1)
+      }
+    }
+
+    assert(fields.toArray.sameElements(userFacingValueSchema.fields),
+      "Exposed fields should be same with given user facing schema for value! 
" +
+        s"Exposed fields: ${fields.mkString("(", ", ", ")")} / " +
+        s"User facing value fields: 
${userFacingValueSchema.fields.mkString("(", ", ", ")")}")
+
+    val attrs = fields.map {
+      f => AttributeReference(f.name, f.dataType, f.nullable)()
+    }
+    (attrs, version)
+  }
+
+  private var joinStateManager: SymmetricHashJoinStateManager = _
+
+  private lazy val iter = {
+    if (joinStateManager == null) {
+      val stateInfo = StatefulOperatorStateInfo(
+        partition.sourceOptions.stateCheckpointLocation.toString,
+        partition.queryId, partition.sourceOptions.operatorId,
+        partition.sourceOptions.batchId + 1, -1)
+      joinStateManager = new SymmetricHashJoinStateManager(
+        joinSide,
+        inputAttributes,
+        joinKeys = DataTypeUtils.toAttributes(keySchema),
+        stateInfo = Some(stateInfo),
+        storeConf = storeConf,
+        hadoopConf = hadoopConf.value,
+        partitionId = partition.partition,
+        formatVersion,
+        skippedNullValueCount = None
+      )
+    }
+
+    // state format 2
+    val valueWithMatchedExprs = inputAttributes :+ Literal(true)
+    val indexOrdinalInValueWithMatchedRow = inputAttributes.size
+    val valueWithMatchedRowGenerator = 
UnsafeProjection.create(valueWithMatchedExprs,
+      inputAttributes)
+
+    joinStateManager.iterator.map { pair =>
+      if (formatVersion == 2) {
+        val row = valueWithMatchedRowGenerator(pair.value)
+        row.setBoolean(indexOrdinalInValueWithMatchedRow, pair.matched)
+        unifyStateRowPair(pair.key, row)
+      } else { // formatVersion == 1
+        unifyStateRowPair(pair.key, pair.value)
+      }
+    }
+  }
+
+  private var current: InternalRow = _
+
+  override def next(): Boolean = {
+    if (iter.hasNext) {
+      current = iter.next()
+      true
+    } else {
+      current = null
+      false
+    }
+  }
+
+  private val joinedRow = new JoinedRow()
+
+  private def addMetadata(row: InternalRow): InternalRow = {
+    val metadataRow = new GenericInternalRow(
+      StateTable.METADATA_COLUMNS.map(_.name()).map {
+        case "_partition_id" => partition.partition.asInstanceOf[Any]
+      }.toArray
+    )
+    joinedRow.withLeft(row).withRight(metadataRow)
+  }
+
+  override def get(): InternalRow = addMetadata(current)
+
+  override def close(): Unit = {
+    current = null
+    if (joinStateManager != null) {
+      joinStateManager.abortIfNeeded()
+    }
+  }
+
+  private def unifyStateRowPair(pair: (UnsafeRow, UnsafeRow)): InternalRow = {
+    val row = new GenericInternalRow(2)
+    row.update(0, pair._1)
+    row.update(1, pair._2)
+    row
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala
new file mode 100644
index 000000000000..07f4a4b5bacc
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state.utils
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.types.{DataType, StructType}
+
+object SchemaUtil {
+  def getSchemaAsDataType(schema: StructType, fieldName: String): DataType = {
+    schema.getFieldIndex(fieldName) match {
+      case Some(idx) => schema(idx).dataType
+      case _ => throw new AnalysisException(s"field $fieldName not found from 
given schema $schema")
+    }
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingCheckpointConstants.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingCheckpointConstants.scala
new file mode 100644
index 000000000000..932d5b0d75a2
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingCheckpointConstants.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+object StreamingCheckpointConstants {
+  val DIR_NAME_COMMITS = "commits"
+  val DIR_NAME_OFFSETS = "offsets"
+  val DIR_NAME_STATE = "state"
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index 80384f8cb3b9..a385c09b38fc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -88,8 +88,7 @@ class StateSchemaCompatibilityChecker(
   private def schemasCompatible(storedSchema: StructType, schema: StructType): 
Boolean =
     DataType.equalsIgnoreNameAndCompatibleNullability(schema, storedSchema)
 
-  // Visible for testing
-  private[sql] def readSchemaFile(): (StructType, StructType) = {
+  def readSchemaFile(): (StructType, StructType) = {
     val inStream = fm.open(schemaFileLocation)
     try {
       val versionStr = inStream.readUTF()
@@ -104,7 +103,7 @@ class StateSchemaCompatibilityChecker(
     }
   }
 
-  private def createSchemaFile(keySchema: StructType, valueSchema: 
StructType): Unit = {
+  def createSchemaFile(keySchema: StructType, valueSchema: StructType): Unit = 
{
     createSchemaFile(keySchema, valueSchema, schemaWriter)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 9e96da98eb3a..b67c5ffd09a1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming.state
 
 import java.util.Locale
 
+import scala.annotation.tailrec
+
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.TaskContext
@@ -185,6 +187,57 @@ class SymmetricHashJoinStateManager(
     }
   }
 
+  /**
+   * Perform a full scan to provide all available data.
+   *
+   * This produces an iterator over the (key, value, match) tuples. Callers 
are expected
+   * to consume fully to clean up underlying iterators correctly.
+   */
+  def iterator: Iterator[KeyToValuePair] = {
+    new NextIterator[KeyToValuePair] {
+      // Reuse this object to avoid creation+GC overhead.
+      private val reusedRet = new KeyToValuePair()
+
+      private val allKeyToNumValues = keyToNumValues.iterator
+
+      private var currentKey: UnsafeRow = null
+      private var numValues: Long = 0L
+      private var index: Long = 0L
+
+      @tailrec
+      override def getNext(): KeyToValuePair = {
+        if (currentKey != null) {
+          assert(index < numValues)
+
+          val valueAndMatched = keyWithIndexToValue.get(currentKey, index)
+          index += 1
+
+          reusedRet.withNew(currentKey, valueAndMatched)
+
+          if (index == numValues) {
+            currentKey = null
+            numValues = 0L
+            index = 0L
+          }
+
+          reusedRet
+        } else if (allKeyToNumValues.hasNext) {
+          val newKeyToNumValues = allKeyToNumValues.next()
+          currentKey = newKeyToNumValues.key
+          numValues = newKeyToNumValues.numValue
+          index = 0L
+
+          getNext()
+        } else {
+          finished = true
+          null
+        }
+      }
+
+      override protected def close(): Unit = {}
+    }
+  }
+
   /**
    * Remove using a predicate on values.
    *
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
new file mode 100644
index 000000000000..69573f46e689
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
@@ -0,0 +1,779 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.io.{File, FileWriter}
+
+import org.scalatest.Assertions
+
+import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, 
GenericInternalRow}
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
+import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, 
OffsetSeqLog}
+import 
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, 
RocksDBStateStoreProvider, StateStore}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.{IntegerType, StructType}
+
+class StateDataSourceNegativeTestSuite extends StateDataSourceTestBase {
+  import testImplicits._
+
+  test("ERROR: read the state from stateless query") {
+    withTempDir { tempDir =>
+      val inputData = MemoryStream[Int]
+      val df = inputData.toDF()
+        .selectExpr("value", "value % 2 AS value2")
+
+      testStream(df)(
+        StartStream(checkpointLocation = tempDir.getAbsolutePath),
+        AddData(inputData, 1, 2, 3, 4, 5),
+        CheckLastBatch((1, 1), (2, 0), (3, 1), (4, 0), (5, 1)),
+        AddData(inputData, 6, 7, 8),
+        CheckLastBatch((6, 0), (7, 1), (8, 0))
+      )
+
+      intercept[IllegalArgumentException] {
+        spark.read.format("statestore").load(tempDir.getAbsolutePath)
+      }
+    }
+  }
+
+  test("ERROR: no committed batch on default batch ID") {
+    withTempDir { tempDir =>
+      runLargeDataStreamingAggregationQuery(tempDir.getAbsolutePath)
+
+      val offsetLog = new OffsetSeqLog(spark,
+        new File(tempDir.getAbsolutePath, "offsets").getAbsolutePath)
+      val commitLog = new CommitLog(spark,
+        new File(tempDir.getAbsolutePath, "commits").getAbsolutePath)
+
+      offsetLog.purgeAfter(0)
+      commitLog.purgeAfter(-1)
+
+      intercept[IllegalStateException] {
+        spark.read.format("statestore").load(tempDir.getAbsolutePath)
+      }
+    }
+  }
+
+  test("ERROR: corrupted state schema file") {
+    withTempDir { tempDir =>
+      runLargeDataStreamingAggregationQuery(tempDir.getAbsolutePath)
+
+      def rewriteStateSchemaFileToDummy(): Unit = {
+        // Refer to the StateSchemaCompatibilityChecker for the path of state 
schema file
+        val pathForSchema = Seq(
+          "state", "0", StateStore.PARTITION_ID_TO_CHECK_SCHEMA.toString,
+          "_metadata", "schema"
+        ).foldLeft(tempDir) { case (file, dirName) =>
+          new File(file, dirName)
+        }
+
+        assert(pathForSchema.exists())
+        assert(pathForSchema.delete())
+
+        val fileWriter = new FileWriter(pathForSchema)
+        fileWriter.write("lol dummy corrupted schema file")
+        fileWriter.close()
+
+        assert(pathForSchema.exists())
+      }
+
+      rewriteStateSchemaFileToDummy()
+
+      intercept[IllegalArgumentException] {
+        spark.read.format("statestore").load(tempDir.getAbsolutePath)
+      }
+    }
+  }
+
+  test("ERROR: path is not specified") {
+    intercept[IllegalArgumentException] {
+      spark.read.format("statestore").load()
+    }
+  }
+
+  test("ERROR: operator ID specified to negative") {
+    withTempDir { tempDir =>
+      intercept[IllegalArgumentException] {
+        spark.read.format("statestore")
+          .option(StateSourceOptions.OPERATOR_ID, -1)
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .load(tempDir.getAbsolutePath)
+      }
+    }
+  }
+
+  test("ERROR: batch ID specified to negative") {
+    withTempDir { tempDir =>
+      intercept[IllegalArgumentException] {
+        spark.read.format("statestore")
+          .option(StateSourceOptions.BATCH_ID, -1)
+          .load(tempDir.getAbsolutePath)
+      }
+    }
+  }
+
+  test("ERROR: store name is empty") {
+    withTempDir { tempDir =>
+      intercept[IllegalArgumentException] {
+        spark.read.format("statestore")
+          .option(StateSourceOptions.STORE_NAME, "")
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .load(tempDir.getAbsolutePath)
+      }
+    }
+  }
+
+  test("ERROR: invalid value for joinSide option") {
+    withTempDir { tempDir =>
+      intercept[IllegalArgumentException] {
+        spark.read.format("statestore")
+          .option(StateSourceOptions.JOIN_SIDE, "both")
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .load(tempDir.getAbsolutePath)
+      }
+    }
+  }
+
+  test("ERROR: both options `joinSide` and `storeName` are specified") {
+    withTempDir { tempDir =>
+      intercept[IllegalArgumentException] {
+        spark.read.format("statestore")
+          .option(StateSourceOptions.JOIN_SIDE, "right")
+          .option(StateSourceOptions.STORE_NAME, "right-keyToNumValues")
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .load(tempDir.getAbsolutePath)
+      }
+    }
+  }
+
+  test("ERROR: trying to read state data as stream") {
+    withTempDir { tempDir =>
+      runLargeDataStreamingAggregationQuery(tempDir.getAbsolutePath)
+
+      intercept[SparkUnsupportedOperationException] {
+        spark.readStream.format("statestore").load(tempDir.getAbsolutePath)
+          .writeStream.format("noop").start()
+      }
+    }
+  }
+}
+
+/**
+ * Here we build a combination of test criteria for
+ * 1) number of shuffle partitions
+ * 2) state store provider
+ * 3) compression codec
+ * and run one of the test to verify that above configs work.
+ *
+ * We are building 3 x 2 x 4 = 24 different test criteria, and it's probably 
waste of time
+ * and resource to run all combinations for all times, hence we will randomly 
pick 5 tests
+ * per run.
+ */
+class StateDataSourceSQLConfigSuite extends StateDataSourceTestBase {
+
+  private val TEST_SHUFFLE_PARTITIONS = Seq(1, 3, 5)
+  private val TEST_PROVIDERS = Seq(
+    classOf[HDFSBackedStateStoreProvider].getName,
+    classOf[RocksDBStateStoreProvider].getName
+  )
+  private val TEST_COMPRESSION_CODECS = CompressionCodec.ALL_COMPRESSION_CODECS
+
+  private val ALL_COMBINATIONS = {
+    val comb = for (
+      part <- TEST_SHUFFLE_PARTITIONS;
+      provider <- TEST_PROVIDERS;
+      codec <- TEST_COMPRESSION_CODECS
+    ) yield {
+      (part, provider, codec)
+    }
+    scala.util.Random.shuffle(comb)
+  }
+
+  ALL_COMBINATIONS.take(5).foreach { case (part, provider, codec) =>
+    val testName = s"Verify the read with config 
[part=$part][provider=$provider][codec=$codec]"
+    test(testName) {
+      withTempDir { tempDir =>
+        withSQLConf(
+          SQLConf.SHUFFLE_PARTITIONS.key -> part.toString,
+          SQLConf.STATE_STORE_PROVIDER_CLASS.key -> provider,
+          SQLConf.STATE_STORE_COMPRESSION_CODEC.key -> codec) {
+
+          runLargeDataStreamingAggregationQuery(tempDir.getAbsolutePath)
+
+          verifyLargeDataStreamingAggregationQuery(tempDir.getAbsolutePath)
+        }
+      }
+    }
+  }
+
+  test("Use different configs than session config") {
+    withTempDir { tempDir =>
+      withSQLConf(
+        SQLConf.SHUFFLE_PARTITIONS.key -> "3",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName,
+        SQLConf.STATE_STORE_COMPRESSION_CODEC.key -> "zstd") {
+
+        runLargeDataStreamingAggregationQuery(tempDir.getAbsolutePath)
+      }
+
+      // Set the different values in session config, to validate whether state 
data source refers
+      // to the config in offset log.
+      withSQLConf(
+        SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[HDFSBackedStateStoreProvider].getName,
+        SQLConf.STATE_STORE_COMPRESSION_CODEC.key -> "lz4") {
+
+        verifyLargeDataStreamingAggregationQuery(tempDir.getAbsolutePath)
+      }
+    }
+  }
+
+  private def verifyLargeDataStreamingAggregationQuery(checkpointLocation: 
String): Unit = {
+    val operatorId = 0
+    val batchId = 2
+
+    val stateReadDf = spark.read
+      .format("statestore")
+      .option(StateSourceOptions.PATH, checkpointLocation)
+      // explicitly specifying batch ID and operator ID to test out the 
functionality
+      .option(StateSourceOptions.BATCH_ID, batchId)
+      .option(StateSourceOptions.OPERATOR_ID, operatorId)
+      .load()
+
+    val resultDf = stateReadDf
+      .selectExpr("key.groupKey AS key_groupKey", "value.count AS value_cnt",
+        "value.sum AS value_sum", "value.max AS value_max", "value.min AS 
value_min")
+
+    checkAnswer(
+      resultDf,
+      Seq(
+        Row(0, 5, 60, 30, 0), // 0, 10, 20, 30
+        Row(1, 5, 65, 31, 1), // 1, 11, 21, 31
+        Row(2, 5, 70, 32, 2), // 2, 12, 22, 32
+        Row(3, 4, 72, 33, 3), // 3, 13, 23, 33
+        Row(4, 4, 76, 34, 4), // 4, 14, 24, 34
+        Row(5, 4, 80, 35, 5), // 5, 15, 25, 35
+        Row(6, 4, 84, 36, 6), // 6, 16, 26, 36
+        Row(7, 4, 88, 37, 7), // 7, 17, 27, 37
+        Row(8, 4, 92, 38, 8), // 8, 18, 28, 38
+        Row(9, 4, 96, 39, 9) // 9, 19, 29, 39
+      )
+    )
+  }
+}
+
+class HDFSBackedStateDataSourceReadSuite extends StateDataSourceReadSuite {
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
+      classOf[HDFSBackedStateStoreProvider].getName)
+  }
+}
+
+class RocksDBStateDataSourceReadSuite extends StateDataSourceReadSuite {
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
+      classOf[RocksDBStateStoreProvider].getName)
+    
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled",
+      "false")
+  }
+}
+
+class RocksDBWithChangelogCheckpointStateDataSourceReaderSuite extends 
StateDataSourceReadSuite {
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
+      classOf[RocksDBStateStoreProvider].getName)
+    
spark.conf.set("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled",
+      "true")
+  }
+}
+
+abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with 
Assertions {
+
+  test("simple aggregation, state ver 1") {
+    testStreamingAggregation(1)
+  }
+
+  test("simple aggregation, state ver 2") {
+    testStreamingAggregation(2)
+  }
+
+  test("composite key aggregation, state ver 1") {
+    testStreamingAggregationWithCompositeKey(1)
+  }
+
+  test("composite key aggregation, state ver 2") {
+    testStreamingAggregationWithCompositeKey(2)
+  }
+
+  private def testStreamingAggregation(stateVersion: Int): Unit = {
+    withSQLConf(SQLConf.STREAMING_AGGREGATION_STATE_FORMAT_VERSION.key -> 
stateVersion.toString) {
+      withTempDir { tempDir =>
+        runLargeDataStreamingAggregationQuery(tempDir.getAbsolutePath)
+
+        val operatorId = 0
+        val batchId = 2
+
+        val stateReadDf = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+          // explicitly specifying batch ID and operator ID to test out the 
functionality
+          .option(StateSourceOptions.BATCH_ID, batchId)
+          .option(StateSourceOptions.OPERATOR_ID, operatorId)
+          .load()
+
+        val resultDf = stateReadDf
+          .selectExpr("key.groupKey AS key_groupKey", "value.count AS 
value_cnt",
+            "value.sum AS value_sum", "value.max AS value_max", "value.min AS 
value_min")
+
+        checkAnswer(
+          resultDf,
+          Seq(
+            Row(0, 5, 60, 30, 0), // 0, 10, 20, 30
+            Row(1, 5, 65, 31, 1), // 1, 11, 21, 31
+            Row(2, 5, 70, 32, 2), // 2, 12, 22, 32
+            Row(3, 4, 72, 33, 3), // 3, 13, 23, 33
+            Row(4, 4, 76, 34, 4), // 4, 14, 24, 34
+            Row(5, 4, 80, 35, 5), // 5, 15, 25, 35
+            Row(6, 4, 84, 36, 6), // 6, 16, 26, 36
+            Row(7, 4, 88, 37, 7), // 7, 17, 27, 37
+            Row(8, 4, 92, 38, 8), // 8, 18, 28, 38
+            Row(9, 4, 96, 39, 9) // 9, 19, 29, 39
+          )
+        )
+      }
+    }
+  }
+
+  private def testStreamingAggregationWithCompositeKey(stateVersion: Int): 
Unit = {
+    withSQLConf(SQLConf.STREAMING_AGGREGATION_STATE_FORMAT_VERSION.key -> 
stateVersion.toString) {
+      withTempDir { tempDir =>
+        runCompositeKeyStreamingAggregationQuery(tempDir.getAbsolutePath)
+
+        val stateReadDf = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+          // skip version and operator ID to test out functionalities
+          .load()
+
+        val resultDf = stateReadDf
+          .selectExpr("key.groupKey AS key_groupKey", "key.fruit AS key_fruit",
+            "value.count AS value_cnt", "value.sum AS value_sum", "value.max 
AS value_max",
+            "value.min AS value_min")
+
+        checkAnswer(
+          resultDf,
+          Seq(
+            Row(0, "Apple", 2, 6, 6, 0),
+            Row(1, "Banana", 3, 9, 7, 1),
+            Row(0, "Strawberry", 3, 12, 8, 2),
+            Row(1, "Apple", 3, 15, 9, 3),
+            Row(0, "Banana", 2, 14, 10, 4),
+            Row(1, "Strawberry", 1, 5, 5, 5)
+          )
+        )
+      }
+    }
+  }
+
+  test("dropDuplicates") {
+    withTempDir { tempDir =>
+      runDropDuplicatesQuery(tempDir.getAbsolutePath)
+
+      val stateReadDf = spark.read
+        .format("statestore")
+        .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+        // skip version and operator ID to test out functionalities
+        .load()
+
+      val resultDf = stateReadDf
+        .selectExpr("key.value AS key_value", "CAST(key.eventTime AS LONG) AS 
key_eventTime_long")
+
+      checkAnswer(resultDf, Seq(Row(45, 45)))
+
+      val stateReadDf2 = spark.read
+        .format("statestore")
+        .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+        .option(StateSourceOptions.BATCH_ID, 0)
+        .load()
+
+      val resultDf2 = stateReadDf2
+        .selectExpr("key.value AS key_value", "CAST(key.eventTime AS LONG) AS 
key_eventTime_long")
+
+      checkAnswer(resultDf2,
+        (10 to 15).map(idx => Row(idx, idx))
+      )
+    }
+  }
+
+  test("dropDuplicates with column specified") {
+    withTempDir { tempDir =>
+      runDropDuplicatesQueryWithColumnSpecified(tempDir.getAbsolutePath)
+
+      val stateReadDf = spark.read
+        .format("statestore")
+        .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+        // skip version and operator ID to test out functionalities
+        .load()
+
+      val resultDf = stateReadDf
+        .selectExpr("key.col1 AS key_col1")
+
+      checkAnswer(resultDf, Seq(Row("A"), Row("B"), Row("C"), Row("D")))
+
+      val stateReadDf2 = spark.read
+        .format("statestore")
+        .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+        .option(StateSourceOptions.BATCH_ID, 0)
+        .load()
+
+      val resultDf2 = stateReadDf2
+        .selectExpr("key.col1 AS key_col1")
+
+      checkAnswer(resultDf2, Seq(Row("A"), Row("B"), Row("C")))
+    }
+  }
+
+  test("dropDuplicatesWithinWatermark") {
+    withTempDir { tempDir =>
+      runDropDuplicatesWithinWatermarkQuery(tempDir.getAbsolutePath)
+
+      val stateReadDf = spark.read
+        .format("statestore")
+        .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+        // skip version and operator ID to test out functionalities
+        .load()
+
+      val resultDf = stateReadDf
+        .selectExpr("key._1 AS key_1", "value.expiresAtMicros AS 
value_expiresAtMicros")
+
+      checkAnswer(resultDf,
+        Seq(Row("b", 24000000), Row("d", 27000000)))
+
+      val stateReadDf2 = spark.read
+        .format("statestore")
+        .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+        .option(StateSourceOptions.BATCH_ID, 4)
+        .load()
+
+      val resultDf2 = stateReadDf2
+        .selectExpr("key._1 AS key_1", "value.expiresAtMicros AS 
value_expiresAtMicros")
+
+      checkAnswer(resultDf2,
+        Seq(
+          Row("a", 19000000),
+          Row("b", 24000000),
+          Row("c", 23000000)
+        )
+      )
+    }
+  }
+
+  test("flatMapGroupsWithState, state ver 1") {
+    testFlatMapGroupsWithState(1)
+  }
+
+  test("flatMapGroupsWithState, state ver 2") {
+    testFlatMapGroupsWithState(2)
+  }
+
+  private def testFlatMapGroupsWithState(stateVersion: Int): Unit = {
+    withSQLConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key -> 
stateVersion.toString) {
+      withTempDir { tempDir =>
+        runFlatMapGroupsWithStateQuery(tempDir.getAbsolutePath)
+
+        val stateReadDf = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+          .load()
+
+        val resultDf = if (stateVersion == 1) {
+          stateReadDf
+            .selectExpr("key.value AS key_value", "value.numEvents AS 
value_numEvents",
+              "value.startTimestampMs AS value_startTimestampMs",
+              "value.endTimestampMs AS value_endTimestampMs",
+              "value.timeoutTimestamp AS value_timeoutTimestamp")
+        } else { // stateVersion == 2
+          stateReadDf
+            .selectExpr("key.value AS key_value", "value.groupState.numEvents 
AS value_numEvents",
+              "value.groupState.startTimestampMs AS value_startTimestampMs",
+              "value.groupState.endTimestampMs AS value_endTimestampMs",
+              "value.timeoutTimestamp AS value_timeoutTimestamp")
+        }
+
+        checkAnswer(
+          resultDf,
+          Seq(
+            Row("hello", 4, 1000, 4000, 12000),
+            Row("world", 2, 1000, 3000, 12000),
+            Row("scala", 2, 2000, 4000, 12000)
+          )
+        )
+
+        // try to read the value via case class provided in actual query
+        implicit val encoder = Encoders.product[SessionInfo]
+        val df = if (stateVersion == 1) {
+          
stateReadDf.selectExpr("value.*").drop("timeoutTimestamp").as[SessionInfo]
+        } else { // state version == 2
+          stateReadDf.selectExpr("value.groupState.*").as[SessionInfo]
+        }
+
+        val expected = Array(
+          SessionInfo(4, 1000, 4000),
+          SessionInfo(2, 1000, 3000),
+          SessionInfo(2, 2000, 4000)
+        )
+        assert(df.collect().toSet === expected.toSet)
+      }
+    }
+  }
+
+  test("stream-stream join, state ver 1") {
+    testStreamStreamJoin(1)
+  }
+
+  test("stream-stream join, state ver 2") {
+    testStreamStreamJoin(2)
+  }
+
+  private def testStreamStreamJoin(stateVersion: Int): Unit = {
+    def assertInternalColumnIsNotExposed(df: DataFrame): Unit = {
+      val valueSchema = SchemaUtil.getSchemaAsDataType(df.schema, "value")
+        .asInstanceOf[StructType]
+
+      intercept[AnalysisException] {
+        SchemaUtil.getSchemaAsDataType(valueSchema, "matched")
+      }
+    }
+
+    withSQLConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> 
stateVersion.toString) {
+      withTempDir { tempDir =>
+        runStreamStreamJoinQuery(tempDir.getAbsolutePath)
+        val stateReaderForLeft = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+          .option(StateSourceOptions.JOIN_SIDE, "left")
+
+        val stateReadDfForLeft = stateReaderForLeft.load()
+        assertInternalColumnIsNotExposed(stateReadDfForLeft)
+
+        val resultDf = stateReadDfForLeft
+          .selectExpr("key.field0 As key_0", "value.leftId AS leftId",
+            "CAST(value.leftTime AS integer) AS leftTime")
+
+        checkAnswer(
+          resultDf,
+          Seq(Row(2, 2, 2L), Row(4, 4, 4L), Row(6, 6, 6L), Row(8, 8, 8L), 
Row(10, 10, 10L))
+        )
+
+        val stateReaderForRight = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+          .option(StateSourceOptions.JOIN_SIDE, "right")
+
+        val stateReadDfForRight = stateReaderForRight.load()
+        assertInternalColumnIsNotExposed(stateReadDfForRight)
+
+        val resultDf2 = stateReadDfForRight
+          .selectExpr("key.field0 As key_0", "value.rightId AS rightId",
+            "CAST(value.rightTime AS integer) AS rightTime")
+
+        checkAnswer(
+          resultDf2,
+          Seq(Row(6, 6, 6L), Row(8, 8, 8L), Row(10, 10, 10L))
+        )
+
+        val stateReaderForRightKeyToNumValues = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+          .option(StateSourceOptions.STORE_NAME,
+            "right-keyToNumValues")
+
+        val stateReadDfForRightKeyToNumValues = 
stateReaderForRightKeyToNumValues.load()
+        val resultDf3 = stateReadDfForRightKeyToNumValues
+          .selectExpr("key.field0 AS key_0", "value.value")
+
+        checkAnswer(
+          resultDf3,
+          Seq(Row(6, 1L), Row(8, 1L), Row(10, 1L))
+        )
+
+        val stateReaderForRightKeyWithIndexToValue = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+          .option(StateSourceOptions.STORE_NAME,
+            "right-keyWithIndexToValue")
+
+        val stateReadDfForRightKeyWithIndexToValue = 
stateReaderForRightKeyWithIndexToValue.load()
+
+        if (stateVersion == 2) {
+          val resultDf4 = stateReadDfForRightKeyWithIndexToValue
+            .selectExpr("key.field0 AS key_0", "key.index AS key_index",
+              "value.rightId AS rightId", "CAST(value.rightTime AS integer) AS 
rightTime",
+              "value.matched As matched")
+
+          checkAnswer(
+            resultDf4,
+            Seq(Row(6, 0, 6, 6L, true), Row(8, 0, 8, 8L, true), Row(10, 0, 10, 
10L, true))
+          )
+        } else {
+          // stateVersion == 1
+          val resultDf4 = stateReadDfForRightKeyWithIndexToValue
+            .selectExpr("key.field0 AS key_0", "key.index AS key_index",
+              "value.rightId AS rightId", "CAST(value.rightTime AS integer) AS 
rightTime")
+
+          checkAnswer(
+            resultDf4,
+            Seq(Row(6, 0, 6, 6L), Row(8, 0, 8, 8L), Row(10, 0, 10, 10L))
+          )
+        }
+      }
+    }
+  }
+
+  test("metadata column") {
+    withTempDir { tempDir =>
+      import testImplicits._
+      val stream = MemoryStream[Int]
+
+      val df = stream.toDF()
+        .groupBy("value")
+        .count()
+
+      stream.addData(1 to 10000: _*)
+
+      val query = df.writeStream.format("noop")
+        .option("checkpointLocation", tempDir.getAbsolutePath)
+        .outputMode(OutputMode.Update())
+        .start()
+
+      query.processAllAvailable()
+      query.stop()
+
+      val stateReadDf = spark.read
+        .format("statestore")
+        .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+        // skip version and operator ID to test out functionalities
+        .load()
+
+      assert(!stateReadDf.schema.exists(_.name == "_partition_id"),
+      "metadata column should not be exposed until it is explicitly 
specified!")
+
+      val numShufflePartitions = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+
+      val resultDf = stateReadDf
+        .selectExpr("key.value AS key_value", "value.count AS value_count", 
"_partition_id")
+        .where("_partition_id % 2 = 0")
+
+      // NOTE: This is a hash function of distribution for stateful operator.
+      val hash = HashPartitioning(
+        Seq(BoundReference(0, IntegerType, nullable = true)),
+        numShufflePartitions)
+      val partIdExpr = hash.partitionIdExpression
+
+      checkAnswer(resultDf,
+        (1 to 10000).map { idx =>
+          val rowForPartition = new 
GenericInternalRow(Array(idx.asInstanceOf[Any]))
+          Row(idx, 1L, partIdExpr.eval(rowForPartition).asInstanceOf[Int])
+        }.filter { r =>
+          r.getInt(2) % 2 == 0
+        }
+      )
+    }
+  }
+
+  test("metadata column with stream-stream join") {
+    val numShufflePartitions = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+
+    withTempDir { tempDir =>
+      runStreamStreamJoinQueryWithOneThousandInputs(tempDir.getAbsolutePath)
+
+      def assertPartitionIdColumnIsNotExposedByDefault(df: DataFrame): Unit = {
+        assert(!df.schema.exists(_.name == "_partition_id"),
+          "metadata column should not be exposed until it is explicitly 
specified!")
+      }
+
+      def assertPartitionIdColumn(df: DataFrame): Unit = {
+        // NOTE: This is a hash function of distribution for stateful operator.
+        // stream-stream join uses the grouping key for the equality match in 
the join condition.
+        // partitioning is bound to the operator, hence all state stores in 
stream-stream join
+        // will have the same partition ID, regardless of the key in the 
internal state store.
+        val hash = HashPartitioning(
+          Seq(BoundReference(0, IntegerType, nullable = true)),
+          numShufflePartitions)
+        val partIdExpr = hash.partitionIdExpression
+
+        val dfWithPartition = df.selectExpr("key.field0 As key_0", 
"_partition_id")
+          .where("_partition_id % 2 = 0")
+
+        checkAnswer(dfWithPartition,
+          Range.inclusive(2, 1000, 2).map { idx =>
+            val rowForPartition = new 
GenericInternalRow(Array(idx.asInstanceOf[Any]))
+            Row(idx, partIdExpr.eval(rowForPartition).asInstanceOf[Int])
+          }.filter { r =>
+            r.getInt(1) % 2 == 0
+          }
+        )
+      }
+
+      def testForSide(side: String): Unit = {
+        val stateReaderForLeft = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+          .option(StateSourceOptions.JOIN_SIDE, side)
+          .load()
+
+        assertPartitionIdColumnIsNotExposedByDefault(stateReaderForLeft)
+        assertPartitionIdColumn(stateReaderForLeft)
+
+        val stateReaderForKeyToNumValues = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+          .option(StateSourceOptions.STORE_NAME,
+            s"$side-keyToNumValues")
+          .load()
+
+        
assertPartitionIdColumnIsNotExposedByDefault(stateReaderForKeyToNumValues)
+        assertPartitionIdColumn(stateReaderForKeyToNumValues)
+
+        val stateReaderForKeyWithIndexToValue = spark.read
+          .format("statestore")
+          .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
+          .option(StateSourceOptions.STORE_NAME,
+            s"$side-keyWithIndexToValue")
+          .load()
+
+        
assertPartitionIdColumnIsNotExposedByDefault(stateReaderForKeyWithIndexToValue)
+        assertPartitionIdColumn(stateReaderForKeyWithIndexToValue)
+      }
+
+      testForSide("left")
+      testForSide("right")
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
new file mode 100644
index 000000000000..1fe93f891f43
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.streaming.util.StreamManualClock
+
+trait StateDataSourceTestBase extends StreamTest with StateStoreMetricsTest {
+  import testImplicits._
+
+  protected def runCompositeKeyStreamingAggregationQuery(checkpointRoot: 
String): Unit = {
+    val inputData = MemoryStream[Int]
+    val aggregated = getCompositeKeyStreamingAggregationQuery(inputData)
+
+    testStream(aggregated, OutputMode.Update)(
+      StartStream(checkpointLocation = checkpointRoot),
+      // batch 0
+      AddData(inputData, 0 to 5: _*),
+      CheckLastBatch(
+        (0, "Apple", 1, 0, 0, 0),
+        (1, "Banana", 1, 1, 1, 1),
+        (0, "Strawberry", 1, 2, 2, 2),
+        (1, "Apple", 1, 3, 3, 3),
+        (0, "Banana", 1, 4, 4, 4),
+        (1, "Strawberry", 1, 5, 5, 5)
+      ),
+      // batch 1
+      AddData(inputData, 6 to 10: _*),
+      // state also contains (1, "Strawberry", 1, 5, 5, 5) but not updated here
+      CheckLastBatch(
+        (0, "Apple", 2, 6, 6, 0), // 0, 6
+        (1, "Banana", 2, 8, 7, 1), // 1, 7
+        (0, "Strawberry", 2, 10, 8, 2), // 2, 8
+        (1, "Apple", 2, 12, 9, 3), // 3, 9
+        (0, "Banana", 2, 14, 10, 4) // 4, 10
+      ),
+      StopStream,
+      StartStream(checkpointLocation = checkpointRoot),
+      // batch 2
+      AddData(inputData, 3, 2, 1),
+      CheckLastBatch(
+        (1, "Banana", 3, 9, 7, 1), // 1, 7, 1
+        (0, "Strawberry", 3, 12, 8, 2), // 2, 8, 2
+        (1, "Apple", 3, 15, 9, 3) // 3, 9, 3
+      )
+    )
+  }
+
+  private def getCompositeKeyStreamingAggregationQuery(
+      inputData: MemoryStream[Int]): Dataset[(Int, String, Long, Long, Int, 
Int)] = {
+    inputData.toDF()
+      .selectExpr("value", "value % 2 AS groupKey",
+        "(CASE value % 3 WHEN 0 THEN 'Apple' WHEN 1 THEN 'Banana' ELSE 
'Strawberry' END) AS fruit")
+      .groupBy($"groupKey", $"fruit")
+      .agg(
+        count("*").as("cnt"),
+        sum("value").as("sum"),
+        max("value").as("max"),
+        min("value").as("min")
+      )
+      .as[(Int, String, Long, Long, Int, Int)]
+  }
+
+  protected def runLargeDataStreamingAggregationQuery(checkpointRoot: String): 
Unit = {
+    val inputData = MemoryStream[Int]
+    val aggregated = getLargeDataStreamingAggregationQuery(inputData)
+
+    // check with more data - leverage full partitions
+    testStream(aggregated, OutputMode.Update)(
+      StartStream(checkpointLocation = checkpointRoot),
+      // batch 0
+      AddData(inputData, 0 until 20: _*),
+      CheckLastBatch(
+        (0, 2, 10, 10, 0), // 0, 10
+        (1, 2, 12, 11, 1), // 1, 11
+        (2, 2, 14, 12, 2), // 2, 12
+        (3, 2, 16, 13, 3), // 3, 13
+        (4, 2, 18, 14, 4), // 4, 14
+        (5, 2, 20, 15, 5), // 5, 15
+        (6, 2, 22, 16, 6), // 6, 16
+        (7, 2, 24, 17, 7), // 7, 17
+        (8, 2, 26, 18, 8), // 8, 18
+        (9, 2, 28, 19, 9) // 9, 19
+      ),
+      // batch 1
+      AddData(inputData, 20 until 40: _*),
+      CheckLastBatch(
+        (0, 4, 60, 30, 0), // 0, 10, 20, 30
+        (1, 4, 64, 31, 1), // 1, 11, 21, 31
+        (2, 4, 68, 32, 2), // 2, 12, 22, 32
+        (3, 4, 72, 33, 3), // 3, 13, 23, 33
+        (4, 4, 76, 34, 4), // 4, 14, 24, 34
+        (5, 4, 80, 35, 5), // 5, 15, 25, 35
+        (6, 4, 84, 36, 6), // 6, 16, 26, 36
+        (7, 4, 88, 37, 7), // 7, 17, 27, 37
+        (8, 4, 92, 38, 8), // 8, 18, 28, 38
+        (9, 4, 96, 39, 9) // 9, 19, 29, 39
+      ),
+      StopStream,
+      StartStream(checkpointLocation = checkpointRoot),
+      // batch 2
+      AddData(inputData, 0, 1, 2),
+      CheckLastBatch(
+        (0, 5, 60, 30, 0), // 0, 10, 20, 30, 0
+        (1, 5, 65, 31, 1), // 1, 11, 21, 31, 1
+        (2, 5, 70, 32, 2) // 2, 12, 22, 32, 2
+      )
+    )
+  }
+
+  private def getLargeDataStreamingAggregationQuery(
+      inputData: MemoryStream[Int]): Dataset[(Int, Long, Long, Int, Int)] = {
+    inputData.toDF()
+      .selectExpr("value", "value % 10 AS groupKey")
+      .groupBy($"groupKey")
+      .agg(
+        count("*").as("cnt"),
+        sum("value").as("sum"),
+        max("value").as("max"),
+        min("value").as("min")
+      )
+      .as[(Int, Long, Long, Int, Int)]
+  }
+
+  protected def runDropDuplicatesQuery(checkpointRoot: String): Unit = {
+    val inputData = MemoryStream[Int]
+    val deduplicated = getDropDuplicatesQuery(inputData)
+
+    testStream(deduplicated, OutputMode.Append())(
+      StartStream(checkpointLocation = checkpointRoot),
+
+      AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+      CheckAnswer(10 to 15: _*),
+      assertNumStateRows(total = 6, updated = 6),
+
+      AddData(inputData, 25), // Advance watermark to 15 secs, no-data-batch 
drops rows <= 15
+      CheckNewAnswer(25),
+      assertNumStateRows(total = 1, updated = 1),
+
+      AddData(inputData, 10), // Should not emit anything as data less than 
watermark
+      CheckNewAnswer(),
+      assertNumStateRows(total = 1, updated = 0, droppedByWatermark = 1),
+
+      AddData(inputData, 45), // Advance watermark to 35 seconds, 
no-data-batch drops row 25
+      CheckNewAnswer(45),
+      assertNumStateRows(total = 1, updated = 1)
+    )
+  }
+
+  private def getDropDuplicatesQuery(inputData: MemoryStream[Int]): 
Dataset[Long] = {
+    inputData.toDS()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicates()
+      .select($"eventTime".cast("long").as[Long])
+  }
+
+  protected def runDropDuplicatesQueryWithColumnSpecified(checkpointRoot: 
String): Unit = {
+    val inputData = MemoryStream[(String, Int)]
+    val deduplicated = getDropDuplicatesQueryWithColumnSpecified(inputData)
+
+    testStream(deduplicated, OutputMode.Append())(
+      StartStream(checkpointLocation = checkpointRoot),
+
+      AddData(inputData, ("A", 1), ("B", 2), ("C", 3)),
+      CheckAnswer(("A", 1), ("B", 2), ("C", 3)),
+      assertNumStateRows(total = 3, updated = 3),
+
+      AddData(inputData, ("B", 4), ("D", 5)),
+      CheckNewAnswer(("D", 5)),
+      assertNumStateRows(total = 4, updated = 1)
+    )
+  }
+
+  private def getDropDuplicatesQueryWithColumnSpecified(
+      inputData: MemoryStream[(String, Int)]): Dataset[(String, Int)] = {
+    inputData.toDS()
+      .selectExpr("_1 AS col1", "_2 AS col2")
+      .dropDuplicates("col1")
+      .as[(String, Int)]
+  }
+
+  protected def runDropDuplicatesWithinWatermarkQuery(checkpointRoot: String): 
Unit = {
+    val inputData = MemoryStream[(String, Int)]
+    val deduplicated = getDropDuplicatesWithinWatermarkQuery(inputData)
+
+    testStream(deduplicated, OutputMode.Append())(
+      StartStream(checkpointLocation = checkpointRoot),
+
+      // Advances watermark to 15
+      AddData(inputData, "a" -> 17),
+      CheckNewAnswer("a" -> 17),
+      // expired time is set to 19
+      assertNumStateRows(total = 1, updated = 1),
+
+      // Watermark does not advance
+      AddData(inputData, "a" -> 16),
+      CheckNewAnswer(),
+      assertNumStateRows(total = 1, updated = 0),
+
+      // Watermark does not advance
+      // Should not emit anything as data less than watermark
+      AddData(inputData, "a" -> 13),
+      CheckNewAnswer(),
+      assertNumStateRows(total = 1, updated = 0, droppedByWatermark = 1),
+
+      // Advances watermark to 20. no-data batch drops state row ("a" -> 19)
+      AddData(inputData, "b" -> 22, "c" -> 21),
+      CheckNewAnswer("b" -> 22, "c" -> 21),
+      // expired time is set to 24 and 23
+      assertNumStateRows(total = 2, updated = 2),
+
+      // Watermark does not advance
+      AddData(inputData, "a" -> 21),
+      // "a" is identified as new event since previous batch dropped state row 
("a" -> 19)
+      CheckNewAnswer("a" -> 21),
+      // expired time is set to 23
+      assertNumStateRows(total = 3, updated = 1),
+
+      // Advances watermark to 23. no-data batch drops state row ("a" -> 23), 
("c" -> 23)
+      AddData(inputData, "d" -> 25),
+      CheckNewAnswer("d" -> 25),
+      assertNumStateRows(total = 2, updated = 1)
+    )
+  }
+
+  private def getDropDuplicatesWithinWatermarkQuery(
+      inputData: MemoryStream[(String, Int)]): DataFrame = {
+    inputData.toDS()
+      .withColumn("eventTime", timestamp_seconds($"_2"))
+      .withWatermark("eventTime", "2 seconds")
+      .dropDuplicatesWithinWatermark("_1")
+      .select($"_1", $"eventTime".cast("long").as[Long])
+  }
+
+  protected def runFlatMapGroupsWithStateQuery(checkpointRoot: String): Unit = 
{
+    val clock = new StreamManualClock
+
+    val inputData = MemoryStream[(String, Long)]
+    val remapped = getFlatMapGroupsWithStateQuery(inputData)
+
+    testStream(remapped, OutputMode.Update)(
+      // batch 0
+      StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock,
+        checkpointLocation = checkpointRoot),
+      AddData(inputData, ("hello world", 1L), ("hello scala", 2L)),
+      AdvanceManualClock(1 * 1000),
+      CheckNewAnswer(
+        ("hello", 2, 1000, false),
+        ("world", 1, 0, false),
+        ("scala", 1, 0, false)
+      ),
+      // batch 1
+      AddData(inputData, ("hello world", 3L), ("hello scala", 4L)),
+      AdvanceManualClock(1 * 1000),
+      CheckNewAnswer(
+        ("hello", 4, 3000, false),
+        ("world", 2, 2000, false),
+        ("scala", 2, 2000, false)
+      )
+    )
+  }
+
+  private def getFlatMapGroupsWithStateQuery(
+      inputData: MemoryStream[(String, Long)]): Dataset[(String, Int, Long, 
Boolean)] = {
+    // scalastyle:off line.size.limit
+    // This test code is borrowed from Sessionization example, with 
modification a bit to run with testStream
+    // 
https://github.com/apache/spark/blob/v2.4.1/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala
+    // scalastyle:on
+
+    val events = inputData.toDF()
+      .as[(String, Timestamp)]
+      .flatMap { case (line, timestamp) =>
+        line.split(" ").map(word => Event(sessionId = word, timestamp))
+      }
+
+    val sessionUpdates = events
+      .groupByKey(event => event.sessionId)
+      .mapGroupsWithState[SessionInfo, 
SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
+
+        case (sessionId: String, events: Iterator[Event], state: 
GroupState[SessionInfo]) =>
+          if (state.hasTimedOut) {
+            val finalUpdate =
+              SessionUpdate(sessionId, state.get.durationMs, 
state.get.numEvents, expired = true)
+            state.remove()
+            finalUpdate
+          } else {
+            val timestamps = events.map(_.timestamp.getTime).toSeq
+            val updatedSession = if (state.exists) {
+              val oldSession = state.get
+              SessionInfo(
+                oldSession.numEvents + timestamps.size,
+                oldSession.startTimestampMs,
+                math.max(oldSession.endTimestampMs, timestamps.max))
+            } else {
+              SessionInfo(timestamps.size, timestamps.min, timestamps.max)
+            }
+            state.update(updatedSession)
+
+            state.setTimeoutDuration("10 seconds")
+            SessionUpdate(sessionId, state.get.durationMs, 
state.get.numEvents, expired = false)
+          }
+      }
+
+    sessionUpdates.map(si => (si.id, si.numEvents, si.durationMs, si.expired))
+  }
+
+  protected def runStreamStreamJoinQuery(checkpointRoot: String): Unit = {
+    val inputData = MemoryStream[(Int, Long)]
+    val query = getStreamStreamJoinQuery(inputData)
+
+    testStream(query)(
+      StartStream(checkpointLocation = checkpointRoot),
+      AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
+      // batch 1 - global watermark = 0
+      // states
+      // left: (2, 2L), (4, 4L)
+      // right: (2, 2L), (4, 4L)
+      CheckNewAnswer((2, 2L, 2, 2L), (4, 4L, 4, 4L)),
+      // filter is applied to both sides as an optimization
+      assertNumStateRows(4, 4),
+      AddData(inputData, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)),
+      // batch 2 - global watermark = 5
+      // states
+      // left: (2, 2L), (4, 4L), (6, 6L), (8, 8L), (10, 10L)
+      // right: (6, 6L), (8, 8L), (10, 10L)
+      // states evicted
+      // left: nothing (it waits for 5 seconds more than watermark due to join 
condition)
+      // right: (2, 2L), (4, 4L)
+      // NOTE: look for evicted rows in right which are not evicted from left 
- they were
+      // properly joined in batch 1
+      CheckNewAnswer((6, 6L, 6, 6L), (8, 8L, 8, 8L), (10, 10L, 10, 10L)),
+      assertNumStateRows(8, 6)
+    )
+  }
+
+  protected def runStreamStreamJoinQueryWithOneThousandInputs(checkpointRoot: 
String): Unit = {
+    val inputData = MemoryStream[(Int, Long)]
+    val query = getStreamStreamJoinQuery(inputData)
+
+    // To ease of tests, we do not run a no-data microbatch to not advance 
watermark.
+    withSQLConf(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key -> 
"false") {
+      testStream(query)(
+        StartStream(checkpointLocation = checkpointRoot),
+        AddData(inputData,
+          (1 to 1000).map { i => (i, i.toLong) }: _*),
+        ProcessAllAvailable(),
+        // filter is applied to both sides as an optimization
+        assertNumStateRows(1000, 1000)
+      )
+    }
+  }
+
+  private def getStreamStreamJoinQuery(inputStream: MemoryStream[(Int, 
Long)]): DataFrame = {
+    val df = inputStream.toDS()
+      .select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp"))
+
+    val leftStream = df.select(col("value").as("leftId"), 
col("timestamp").as("leftTime"))
+
+    val rightStream = df
+      // Introduce misses for ease of debugging
+      .where(col("value") % 2 === 0)
+      .select(col("value").as("rightId"), col("timestamp").as("rightTime"))
+
+    leftStream
+      .withWatermark("leftTime", "5 seconds")
+      .join(
+        rightStream.withWatermark("rightTime", "5 seconds"),
+        expr("rightId = leftId AND rightTime >= leftTime AND " +
+          "rightTime <= leftTime + interval 5 seconds"),
+        joinType = "inner")
+      .select(col("leftId"), col("leftTime").cast("int"),
+        col("rightId"), col("rightTime").cast("int"))
+  }
+}
+
+case class Event(sessionId: String, timestamp: Timestamp)
+
+case class SessionInfo(
+    numEvents: Int,
+    startTimestampMs: Long,
+    endTimestampMs: Long) {
+  def durationMs: Long = endTimestampMs - startTimestampMs
+}
+
+case class SessionUpdate(
+    id: String,
+    durationMs: Long,
+    numEvents: Int,
+    expired: Boolean)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to