This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b2601be71659 [SPARK-55131][SS] Change the default merge operator 
delimiter for RocksDB to empty string to concat without delimiter
b2601be71659 is described below

commit b2601be716598f8cc5326382f46b4d0c855c5220
Author: Jungtaek Lim <[email protected]>
AuthorDate: Fri Feb 6 21:54:12 2026 +0900

    [SPARK-55131][SS] Change the default merge operator delimiter for RocksDB 
to empty string to concat without delimiter
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to change the default delimiter for the merge operator of 
RocksDB to an empty string, so that merge operation does not add a delimiter 
and concat two without any character.
    
    Changing the delimiter isn't compatible with existing checkpoints, so this 
change is coupled with SQLConf, with known offset log metadata trick, to apply 
the change only for new streaming queries.
    
    * New SQL config: 
`spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion`
    * Default: 2 ('' as delimiter)
    * Default for existing checkpoints: 1 (',' as delimiter)
    
    ### Why are the changes needed?
    
    We found out there is no way to distinguish two cases of 1) put against 
non-existence value then merge and 2) merge against non-existence value then 
merge, from the current delimiter. There has been an "implication" that 
operators do ensure they call merge only when they know the operation is 
against existing key. This effectively requires GET operation which can be an 
outstanding performance impact depending on the logic.
    
    Making delimiter to an empty string (none) would eliminate the difference 
between the two cases, allowing operators to perform blind merge without 
checking the existence of the key.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, the change is internal and there is no user-facing change.
    
    ### How was this patch tested?
    
    Added UTs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Co-authored by claude-4.5-sonnet
    
    Closes #54083 from HeartSaVioR/SPARK-55131.
    
    Authored-by: Jungtaek Lim <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  15 ++
 .../streaming/checkpointing/OffsetSeq.scala        |   6 +-
 .../sql/execution/streaming/state/RocksDB.scala    |  52 +++++--
 .../streaming/state/RocksDBStateEncoder.scala      |  11 +-
 .../state/RocksDBStateStoreProvider.scala          |  10 +-
 .../streaming/state/StateStoreRowChecksum.scala    |  20 ++-
 .../execution/streaming/OffsetSeqLogSuite.scala    |  41 +++++
 .../streaming/state/RocksDBStateStoreSuite.scala   | 110 +++++++++++++-
 .../execution/streaming/state/RocksDBSuite.scala   | 165 ++++++++++++++++++---
 9 files changed, 375 insertions(+), 55 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7973c652a695..f17199547665 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3045,6 +3045,21 @@ object SQLConf {
       // 5 is the default table format version for RocksDB 6.20.3.
       .createWithDefault(5)
 
+  /**
+   * Note: this is defined in `RocksDBConf.MERGE_OPERATOR_VERSION_CONF`. These 
two places should
+   * be updated together.
+   */
+  val STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION =
+    buildConf("spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion")
+      .internal()
+      .doc("Set the RocksDB merge operator version. This will be stored in the 
checkpoint when " +
+        "starting a streaming query. The checkpoint will use this merge 
operator version for " +
+        "the entire lifetime of the query.")
+      .version("4.2.0")
+      .intConf
+      .checkValue(v => v == 1 || v == 2, "Must be 1 or 2")
+      .createWithDefault(2)
+
   val STREAMING_AGGREGATION_STATE_FORMAT_VERSION =
     buildConf("spark.sql.streaming.aggregation.stateFormatVersion")
       .internal()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
index c6245ba1b2a9..23c266e82891 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
@@ -200,7 +200,8 @@ object OffsetSeqMetadata extends Logging {
     STATE_STORE_PROVIDER_CLASS, STREAMING_MULTIPLE_WATERMARK_POLICY,
     FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, 
STREAMING_AGGREGATION_STATE_FORMAT_VERSION,
     STREAMING_JOIN_STATE_FORMAT_VERSION, STATE_STORE_COMPRESSION_CODEC,
-    STATE_STORE_ROCKSDB_FORMAT_VERSION, 
STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
+    STATE_STORE_ROCKSDB_FORMAT_VERSION, 
STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION,
+    STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
     PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, 
STREAMING_STATE_STORE_ENCODING_FORMAT,
     STATE_STORE_ROW_CHECKSUM_ENABLED
   )
@@ -249,7 +250,8 @@ object OffsetSeqMetadata extends Logging {
     STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION.key -> "false",
     PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key -> "true",
     STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "unsaferow",
-    STATE_STORE_ROW_CHECKSUM_ENABLED.key -> "false"
+    STATE_STORE_ROW_CHECKSUM_ENABLED.key -> "false",
+    STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "1"
   )
 
   def readValue[T](metadataLog: OffsetSeqMetadataBase, confKey: 
ConfigEntry[T]): String = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 8d6ae2a180c4..69a7e9618bb3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -127,7 +127,20 @@ class RocksDB(
   rocksDbOptions.setMaxOpenFiles(conf.maxOpenFiles)
   rocksDbOptions.setAllowFAllocate(conf.allowFAllocate)
   rocksDbOptions.setAvoidFlushDuringShutdown(true)
-  rocksDbOptions.setMergeOperator(new StringAppendOperator())
+  // Set merge operator based on version for backward compatibility
+  // Version 1: comma delimiter ",", Version 2: empty string ""
+  // NOTE: RocksDB does not document about the behavior of an empty string 
delimiter,
+  // only the PR description explains this - 
https://github.com/facebook/rocksdb/pull/8536
+  val mergeDelimiter = conf.mergeOperatorVersion match {
+    case 1 => ","
+    case 2 => ""
+    case v => throw new IllegalArgumentException(
+      s"Invalid merge operator version: $v. Supported versions are 1 and 2")
+  }
+  rocksDbOptions.setMergeOperator(new StringAppendOperator(mergeDelimiter))
+
+  // Store delimiter size for use in encode/decode operations
+  private[state] val delimiterSize = mergeDelimiter.length
 
   if (conf.boundedMemoryUsage) {
     rocksDbOptions.setWriteBufferManager(writeBufferManager)
@@ -1041,7 +1054,7 @@ class RocksDB(
     val (finalKey, value) = getValue(key, cfName)
     if (conf.rowChecksumEnabled && value != null) {
       KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
-        readVerifier, finalKey, value)
+        readVerifier, finalKey, value, delimiterSize)
     } else {
       value
     }
@@ -1076,7 +1089,7 @@ class RocksDB(
       valuesList.asScala.iterator.zipWithIndex.map {
         case (value, idx) if value != null =>
           KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
-            readVerifier, finalKeys(idx), value)
+            readVerifier, finalKeys(idx), value, delimiterSize)
         case _ => null
       }
     } else {
@@ -1118,7 +1131,7 @@ class RocksDB(
 
     val (finalKey, value) = getValue(key, cfName)
     KeyValueChecksumEncoder.decodeAndVerifyMultiValueRowWithChecksum(
-      readVerifier, finalKey, value)
+      readVerifier, finalKey, value, delimiterSize)
   }
 
   /** Returns a tuple of the final key used to store the value in the db and 
the value. */
@@ -1234,12 +1247,12 @@ class RocksDB(
     } else {
       values
     }
-    // Delimit each value row bytes with a single byte delimiter, the last
+    // Delimit each value row bytes with delimiter, the last
     // value row won't have a delimiter at the end.
     val delimiterNum = valueWithChecksum.length - 1
     // The bytes in valueWithChecksum already include the bytes length prefix
     val totalSize = valueWithChecksum.map(_.length).sum +
-      delimiterNum // for each delimiter
+      delimiterNum * delimiterSize // for each delimiter
 
     val result = new Array[Byte](totalSize)
     var pos = Platform.BYTE_ARRAY_OFFSET
@@ -1249,12 +1262,14 @@ class RocksDB(
       Platform.copyMemory(rowBytes, Platform.BYTE_ARRAY_OFFSET, result, pos, 
rowBytes.length)
       pos += rowBytes.length
 
-      // Add the delimiter - we are using "," as the delimiter
-      if (idx < delimiterNum) {
-        result(pos - Platform.BYTE_ARRAY_OFFSET) = 44.toByte
+      // Add the delimiter if not the last value and delimiter is not empty
+      if (idx < delimiterNum && delimiterSize > 0) {
+        mergeDelimiter.getBytes.zipWithIndex.foreach { case (b, i) =>
+          result(pos - Platform.BYTE_ARRAY_OFFSET + i) = b
+        }
       }
       // Move the position for delimiter
-      pos += 1
+      pos += delimiterSize
     }
     result
   }
@@ -1474,7 +1489,7 @@ class RocksDB(
 
           val value = if (conf.rowChecksumEnabled) {
             KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
-              readVerifier, iter.key, iter.value)
+              readVerifier, iter.key, iter.value, delimiterSize)
           } else {
             iter.value
           }
@@ -1572,7 +1587,7 @@ class RocksDB(
 
           val value = if (conf.rowChecksumEnabled) {
             KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
-              readVerifier, iter.key, iter.value)
+              readVerifier, iter.key, iter.value, delimiterSize)
           } else {
             iter.value
           }
@@ -2391,6 +2406,7 @@ case class RocksDBConf(
     fileChecksumEnabled: Boolean,
     rowChecksumEnabled: Boolean,
     rowChecksumReadVerificationRatio: Long,
+    mergeOperatorVersion: Int,
     stateStoreConf: StateStoreConf)
 
 object RocksDBConf {
@@ -2498,6 +2514,17 @@ object RocksDBConf {
   private val VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF =
     SQLConfEntry(VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF_KEY, "true")
 
+  // Configuration to set the merge operator version for backward 
compatibility.
+  // Version 1 (default): Uses comma "," as delimiter for StringAppendOperator
+  // Version 2: Uses empty string "" as delimiter (no delimiter, direct 
concatenation)
+  //
+  // Note: this is also defined in 
`SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION`.
+  // These two places should be updated together.
+  val MERGE_OPERATOR_VERSION_CONF_KEY = "mergeOperatorVersion"
+  private val MERGE_OPERATOR_VERSION_CONF = 
SQLConfEntry(MERGE_OPERATOR_VERSION_CONF_KEY, "2")
+
+  val MERGE_OPERATOR_VALID_VERSIONS: Seq[Int] = Seq(1, 2)
+
   def apply(storeConf: StateStoreConf): RocksDBConf = {
     val sqlConfs = CaseInsensitiveMap[String](storeConf.sqlConfs)
     val extraConfs = CaseInsensitiveMap[String](storeConf.extraOptions)
@@ -2594,6 +2621,7 @@ object RocksDBConf {
       storeConf.checkpointFileChecksumEnabled,
       storeConf.rowChecksumEnabled,
       storeConf.rowChecksumReadVerificationRatio,
+      getPositiveIntConf(MERGE_OPERATOR_VERSION_CONF),
       storeConf)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
index 102f38443b8b..4111e7b42623 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
@@ -1350,14 +1350,16 @@ object RocksDBStateEncoder extends Logging {
    * @param valueSchema Schema defining the structure of values to be encoded
    * @param useMultipleValuesPerKey If true, creates an encoder that can 
handle multiple values
    *                                per key; if false, creates an encoder for 
single values
+   * @param delimiterSize Size of the delimiter used between multiple values 
(in bytes)
    * @return A configured RocksDBValueStateEncoder instance
    */
   def getValueEncoder(
       dataEncoder: RocksDBDataEncoder,
       valueSchema: StructType,
-      useMultipleValuesPerKey: Boolean): RocksDBValueStateEncoder = {
+      useMultipleValuesPerKey: Boolean,
+      delimiterSize: Int = 1): RocksDBValueStateEncoder = {
     if (useMultipleValuesPerKey) {
-      new MultiValuedStateEncoder(dataEncoder, valueSchema)
+      new MultiValuedStateEncoder(dataEncoder, valueSchema, delimiterSize)
     } else {
       new SingleValueStateEncoder(dataEncoder, valueSchema)
     }
@@ -1726,7 +1728,8 @@ class NoPrefixKeyStateEncoder(
  */
 class MultiValuedStateEncoder(
     dataEncoder: RocksDBDataEncoder,
-    valueSchema: StructType)
+    valueSchema: StructType,
+    delimiterSize: Int)
   extends RocksDBValueStateEncoder with Logging {
 
   override def encodeValue(row: UnsafeRow): Array[Byte] = {
@@ -1786,7 +1789,7 @@ class MultiValuedStateEncoder(
             numBytes
           )
           pos += numBytes
-          pos += 1 // eat the delimiter character
+          pos += delimiterSize // eat the delimiter based on actual delimiter 
size
           dataEncoder.decodeValue(encodedValue)
         }
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 7494c12d028f..6dde67dd5b66 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -237,7 +237,8 @@ private[sql] class RocksDBStateStoreProvider
       val valueEncoder = RocksDBStateEncoder.getValueEncoder(
         dataEncoder,
         valueSchema,
-        useMultipleValuesPerKey
+        useMultipleValuesPerKey,
+        rocksDB.delimiterSize
       )
       keyValueEncoderMap.putIfAbsent(colFamilyName, (keyEncoder, valueEncoder, 
cfId))
     }
@@ -733,7 +734,8 @@ private[sql] class RocksDBStateStoreProvider
     val valueEncoder = RocksDBStateEncoder.getValueEncoder(
       dataEncoder,
       valueSchema,
-      useMultipleValuesPerKey
+      useMultipleValuesPerKey,
+      rocksDB.delimiterSize
     )
 
     var cfId: Short = 0
@@ -1443,7 +1445,7 @@ class RocksDBStateStoreChangeDataReader(
           val valueBytes = if (storeConf.rowChecksumEnabled &&
             nextRecord._1 != RecordType.DELETE_RECORD) {
             KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
-              readVerifier, keyBytes, nextRecord._3)
+              readVerifier, keyBytes, nextRecord._3, rocksDB.delimiterSize)
           } else {
             nextRecord._3
           }
@@ -1466,7 +1468,7 @@ class RocksDBStateStoreChangeDataReader(
             (nextRecord._1, key, nextRecord._3)
           case _ =>
             val value = 
KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
-              readVerifier, nextRecord._2, nextRecord._3)
+              readVerifier, nextRecord._2, nextRecord._3, 
rocksDB.delimiterSize)
             (nextRecord._1, nextRecord._2, value)
         }
       } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRowChecksum.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRowChecksum.scala
index f02a598600c3..88f2e407bccf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRowChecksum.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRowChecksum.scala
@@ -260,12 +260,14 @@ object KeyValueChecksumEncoder {
    * @param verifier used for checksum verification.
    * @param keyBytes Key bytes for the value to decode, only used for checksum 
verification.
    * @param valueBytes The value bytes to decode.
+   * @param delimiterSize Size of delimiter used between merged values (in 
bytes).
    * @return The original value row bytes, without the checksum.
    * */
   def decodeAndVerifyValueRowWithChecksum(
       verifier: Option[KeyValueIntegrityVerifier],
       keyBytes: Array[Byte],
-      valueBytes: Array[Byte]): Array[Byte] = {
+      valueBytes: Array[Byte],
+      delimiterSize: Int): Array[Byte] = {
     // First get the total size of the original values
     // Doing this to also support decoding merged values (via merge) e.g. 
val1,val2,val3
     val valuesEnd = Platform.BYTE_ARRAY_OFFSET + valueBytes.length
@@ -276,14 +278,14 @@ object KeyValueChecksumEncoder {
       // skip the checksum (first 4 bytes)
       currentPosition += java.lang.Integer.BYTES
       val valueRowSize = Platform.getInt(valueBytes, currentPosition)
-      // move to the next value and skip the delimiter character used for 
rocksdb merge
-      currentPosition += java.lang.Integer.BYTES + valueRowSize + 1
+      // move to the next value and skip the delimiter bytes used for rocksdb 
merge
+      currentPosition += java.lang.Integer.BYTES + valueRowSize + delimiterSize
       resultSize += valueRowSize
       numValues += 1
     }
 
     // include the number of delimiters used for merge
-    resultSize += numValues - 1
+    resultSize += (numValues - 1) * delimiterSize
 
     // now verify and decode to original merged values
     val result = new Array[Byte](resultSize)
@@ -308,7 +310,7 @@ object KeyValueChecksumEncoder {
 
       // No delimiter is needed if single value or the last value in 
multi-value
       val copyLength = if (currentValueCount < numValues) {
-        valueRowSize + 1 // copy the delimiter
+        valueRowSize + delimiterSize // copy the delimiter
       } else {
         valueRowSize
       }
@@ -337,12 +339,14 @@ object KeyValueChecksumEncoder {
    * @param verifier Used for checksum verification.
    * @param keyBytes Key bytes for the value to decode, only used for checksum 
verification.
    * @param valueBytes The value bytes to decode.
+   * @param delimiterSize Size of delimiter used between merged values (in 
bytes).
    * @return Iterator of index range representing the original value row 
bytes, without checksum.
    */
   def decodeAndVerifyMultiValueRowWithChecksum(
       verifier: Option[KeyValueIntegrityVerifier],
       keyBytes: Array[Byte],
-      valueBytes: Array[Byte]): Iterator[ArrayIndexRange[Byte]] = {
+      valueBytes: Array[Byte],
+      delimiterSize: Int): Iterator[ArrayIndexRange[Byte]] = {
     if (valueBytes == null) {
       Seq().iterator
     } else {
@@ -357,8 +361,8 @@ object KeyValueChecksumEncoder {
           val (valueRowIndex, checksum) =
             getValueRowIndexAndChecksum(valueBytes, startingPosition = 
position)
           verifier.foreach(_.verify(keyRowIndex, Some(valueRowIndex), 
checksum))
-          // move to the next value and skip the delimiter character used for 
rocksdb merge
-          position = byteIndexToOffset(valueRowIndex.untilIndex) + 1
+          // move to the next value and skip the delimiter bytes used for 
rocksdb merge
+          position = byteIndexToOffset(valueRowIndex.untilIndex) + 
delimiterSize
           valueRowIndex
         }
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index 32082cb62bf0..36bf5e2f8313 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -368,6 +368,47 @@ class OffsetSeqLogSuite extends SharedSparkSession {
     }
   }
 
+  test("SPARK-55131: offset log records defaults to merge operator version 2") 
{
+    val offsetSeqMetadata = OffsetSeqMetadata.apply(batchWatermarkMs = 0, 
batchTimestampMs = 0,
+      spark.conf)
+    
assert(offsetSeqMetadata.conf.get(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key)
 ===
+      Some("2"))
+  }
+
+  test("SPARK-55131: offset log uses the merge operator version set in the 
conf") {
+    val offsetSeqMetadata = OffsetSeqMetadata.apply(batchWatermarkMs = 0, 
batchTimestampMs = 0,
+      // Trying to set it to non-default value, 1
+      Map(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "1"))
+    
assert(offsetSeqMetadata.conf.get(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key)
 ===
+      Some("1"))
+  }
+
+  test("SPARK-55131: Backward compatibility test with merge operator version") 
{
+    // Read from the checkpoint which does not have an entry for merge 
operator version
+    // in its offset log. This should pick up the value to 1 instead of 2.
+    withTempDir { checkpointDir =>
+      val resourceUri = this.getClass.getResource(
+        "/structured-streaming/checkpoint-version-4.0.0-tws-unsaferow/").toURI
+      Utils.copyDirectory(new File(resourceUri), 
checkpointDir.getCanonicalFile)
+
+      val log = new OffsetSeqLog(spark, s"$checkpointDir/offsets")
+      val latestBatchId = log.getLatestBatchId()
+      assert(latestBatchId.isDefined, "No offset log entries found in the 
checkpoint location")
+
+      // Read the latest offset log
+      val offsetSeq = log.get(latestBatchId.get).get
+      val offsetSeqMetadata = offsetSeq.metadataOpt.get
+
+      assert(!offsetSeqMetadata.conf
+        .contains(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key),
+        "Merge operator version should be absent in the offset log entry")
+
+      val clonedSqlConf = spark.sessionState.conf.clone()
+      OffsetSeqMetadata.setSessionConf(offsetSeqMetadata, clonedSqlConf)
+      
assert(clonedSqlConf.getConf(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION)
 == 1)
+    }
+  }
+
   def testWithOffsetV2(
       testName: String, testTags: Tag*)(testBody: => Any): Unit = {
     super.test(testName, testTags: _*) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 8832d7e09933..3c200b860f52 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -1730,7 +1730,18 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
-  test("validate rocksdb values iterator correctness") {
+  private def testMergeWithOperatorVersions(testName: String)(testFn: Int => 
Unit): Unit = {
+    RocksDBConf.MERGE_OPERATOR_VALID_VERSIONS.foreach { version =>
+      test(testName + s" - merge operator version $version") {
+        withSQLConf(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> 
version.toString) {
+          testFn(version)
+        }
+      }
+    }
+  }
+
+  testMergeWithOperatorVersions(
+    "validate rocksdb values iterator correctness - put then merge") { _ =>
     withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1") {
       tryWithProviderResource(newStoreProvider(useColumnFamilies = true,
         useMultipleValuesPerKey = true)) { provider =>
@@ -1766,6 +1777,103 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  test("validate rocksdb values iterator correctness - blind merge with 
operator version 2") {
+    withSQLConf(
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+      SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "2") {
+
+      tryWithProviderResource(newStoreProvider(useColumnFamilies = true,
+        useMultipleValuesPerKey = true)) { provider =>
+        val store = provider.getStore(0)
+        // We do blind merge than put against non-existing key.
+        // Note that this is only safe with merge operator version 2.
+        merge(store, "a", 0, 1)
+
+        val iterator0 = store.valuesIterator(dataToKeyRow("a", 0))
+
+        assert(iterator0.hasNext)
+        assert(valueRowToData(iterator0.next()) === 1)
+        assert(!iterator0.hasNext)
+
+        merge(store, "a", 0, 2)
+        merge(store, "a", 0, 3)
+
+        val iterator1 = store.valuesIterator(dataToKeyRow("a", 0))
+
+        (1 to 3).map { i =>
+          assert(iterator1.hasNext)
+          assert(valueRowToData(iterator1.next()) === i)
+        }
+
+        assert(!iterator1.hasNext)
+
+        store.abort()
+      }
+    }
+  }
+
+  testMergeWithOperatorVersions(
+    "validate rocksdb values iterator correctness - fuzzy merge and put"
+  ) { version =>
+    val seed = System.currentTimeMillis()
+    val rand = new Random(seed)
+    logInfo(s"fuzzy merge and put test using seed: $seed")
+
+    withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1") {
+      tryWithProviderResource(newStoreProvider(useColumnFamilies = true,
+        useMultipleValuesPerKey = true)) { provider =>
+        val store = provider.getStore(0)
+
+        try {
+          val inputValues = scala.collection.mutable.ArrayBuffer[Int]()
+
+          def performPut(value: Int): Unit = {
+            put(store, "a", 0, value)
+            inputValues.clear()
+            inputValues += value.toInt
+          }
+
+          def performMerge(value: Int): Unit = {
+            merge(store, "a", 0, value)
+            inputValues += value.toInt
+          }
+
+          def performRemove(): Unit = {
+            remove(store, _._1 == "a")
+            inputValues.clear()
+          }
+
+          (0 to 100000).foreach { _ =>
+            val op = rand.nextInt(3)
+
+            op match {
+              case 0 =>
+                val value = rand.nextInt(10)
+                performPut(value)
+              case 1 =>
+                val value = rand.nextInt(10)
+                if (inputValues.isEmpty && version == 1) {
+                  // version 1 can't handle blind merge against non-existing 
key, so we have to
+                  // fall back to put
+                  performPut(value)
+                } else {
+                  performMerge(value)
+                }
+              case 2 =>
+                performRemove()
+            }
+
+            val iterator = store.valuesIterator(dataToKeyRow("a", 0))
+            val valuesInStateStore = iterator.map(valueRowToData).toSeq
+            assert(valuesInStateStore === inputValues)
+          }
+        } finally {
+          store.abort()
+        }
+      }
+    }
+  }
+
   /* Column family related tests */
   testWithColumnFamiliesAndEncodingTypes("column family creation with invalid 
names",
     TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 7aced88d3089..bb02dad76ac6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -35,7 +35,7 @@ import org.scalactic.source.Position
 import org.scalatest.PrivateMethodTester
 import org.scalatest.Tag
 
-import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TaskContext}
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, 
SparkIllegalArgumentException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.catalyst.InternalRow
@@ -1502,7 +1502,8 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         // Verify merge operation worked
         db.load(0)
         db.load(5)
-        assert(toStr(db.get("merge_key", StateStore.DEFAULT_COL_FAMILY_NAME)) 
=== "base,appended")
+        // SPARK-55131: new merge operation concatenates the strings without 
any separator
+        assert(toStr(db.get("merge_key", StateStore.DEFAULT_COL_FAMILY_NAME)) 
=== "baseappended")
       }
     }
   }
@@ -2178,14 +2179,29 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     }
   }
 
-  test("RocksDB: ensure merge operation correctness") {
+  private def testMergeWithOperatorVersions(testName: String)(testFn: Int => 
Unit): Unit = {
+    RocksDBConf.MERGE_OPERATOR_VALID_VERSIONS.foreach { version =>
+      test(testName + s" - merge operator version $version") {
+        withSQLConf(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> 
version.toString) {
+          testFn(version)
+        }
+      }
+    }
+  }
+
+  private def expectedResultForMerge(inputs: Seq[String], 
mergeOperatorVersion: Int): String = {
+    mergeOperatorVersion match {
+      case 1 => inputs.mkString(",")
+      case 2 => inputs.mkString("")
+    }
+  }
+
+  testMergeWithOperatorVersions("put then merge") { version =>
     withTempDir { dir =>
-      val remoteDir = Utils.createTempDir().toString
       // minDeltasForSnapshot being 5 ensures that only changelog files are 
created
       // for the 3 commits below
       val conf = dbConf.copy(minDeltasForSnapshot = 5, compactOnCommit = false)
-      new File(remoteDir).delete() // to make sure that the directory gets 
created
-      withDB(remoteDir, conf = conf, useColumnFamilies = true) { db =>
+      withDB(dir.getCanonicalPath, conf = conf, useColumnFamilies = true) { db 
=>
         db.load(0)
         db.put("a", "1")
         db.merge("a", "2")
@@ -2200,12 +2216,14 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         db.commit()
 
         db.load(1)
-        assert(new String(db.get("a")) === "1,2")
-        assert(db.iterator().map(toStr).toSet === Set(("a", "1,2")))
+        val expectedValue = expectedResultForMerge(Seq("1", "2"), version)
+        assert(new String(db.get("a")) === expectedValue)
+        assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue)))
 
         db.load(2)
-        assert(new String(db.get("a")) === "1,2,3")
-        assert(db.iterator().map(toStr).toSet === Set(("a", "1,2,3")))
+        val expectedValue2 = expectedResultForMerge(Seq("1", "2", "3"), 
version)
+        assert(new String(db.get("a")) === expectedValue2)
+        assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue2)))
 
         db.load(3)
         assert(db.get("a") === null)
@@ -2214,17 +2232,59 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     }
   }
 
-  test("RocksDB: ensure putList / mergeList operation correctness") {
+  test("blind merge without put against non-existence key with operator 
version 2") {
+    withSQLConf(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "2") 
{
+      withTempDir { dir =>
+        // minDeltasForSnapshot being 5 ensures that only changelog files are 
created
+        // for the 3 commits below
+        val conf = dbConf.copy(minDeltasForSnapshot = 5, compactOnCommit = 
false)
+        withDB(dir.getCanonicalPath, conf = conf, useColumnFamilies = true) { 
db =>
+          db.load(0)
+          // We don't put "a" first here, we call merge against non-existence 
key
+          // Note that this is only safe with merge operator version 2 - in 
version 1, reader side
+          // can't distinguish the case where the first byte starts with the 
size of element, or
+          // delimiter. Merge operator version 2 concatenates the values 
directly without delimiter
+          // so that the reader side does not need to distinguish the two 
cases.
+          db.merge("a", "1")
+          db.merge("a", "2")
+          db.commit()
+
+          db.load(1)
+          db.remove("a")
+          db.commit()
+
+          db.load(2)
+          db.merge("a", "3")
+          db.merge("a", "4")
+          db.commit()
+
+          db.load(1)
+          val expectedValue = "12"
+          assert(new String(db.get("a")) === expectedValue)
+          assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue)))
+
+          db.load(2)
+          assert(db.get("a") === null)
+          assert(db.iterator().isEmpty)
+
+          db.load(3)
+          val expectedValue2 = "34"
+          assert(new String(db.get("a")) === expectedValue2)
+          assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue2)))
+        }
+      }
+    }
+  }
+
+  testMergeWithOperatorVersions("putList then mergeList") { version =>
     withTempDir { dir =>
-      val remoteDir = Utils.createTempDir().toString
       // minDeltasForSnapshot being 5 ensures that only changelog files are 
created
       // for the 3 commits below
       val conf = dbConf.copy(minDeltasForSnapshot = 5, compactOnCommit = false)
-      new File(remoteDir).delete() // to make sure that the directory gets 
created
-      withDB(remoteDir, conf = conf, useColumnFamilies = true) { db =>
+      withDB(dir.getCanonicalPath, conf = conf, useColumnFamilies = true) { db 
=>
         db.load(0)
-        db.put("a", "1".getBytes)
-        db.mergeList("a", Seq("2", "3", "4").map(_.getBytes).toList)
+        db.putList("a", Seq("1", "2").map(_.getBytes).toList)
+        db.mergeList("a", Seq("3", "4").map(_.getBytes).toList)
         db.commit()
 
         db.load(1)
@@ -2244,26 +2304,83 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         db.commit()
 
         db.load(1)
-        assert(new String(db.get("a")) === "1,2,3,4")
-        assert(db.iterator().map(toStr).toSet === Set(("a", "1,2,3,4")))
+        val expectedValue = expectedResultForMerge(Seq("1", "2", "3", "4"), 
version)
+        assert(new String(db.get("a")) === expectedValue)
+        assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue)))
 
         db.load(2)
-        assert(new String(db.get("a")) === "1,2,3,4,5,6")
-        assert(db.iterator().map(toStr).toSet === Set(("a", "1,2,3,4,5,6")))
+        val expectedValue2 = expectedResultForMerge(Seq("1", "2", "3", "4", 
"5", "6"), version)
+        assert(new String(db.get("a")) === expectedValue2)
+        assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue2)))
 
         db.load(3)
         assert(db.get("a") === null)
         assert(db.iterator().isEmpty)
 
         db.load(4)
-        assert(new String(db.get("a")) === "7,8,9")
-        assert(db.iterator().map(toStr).toSet === Set(("a", "7,8,9")))
+        val expectedValue3 = expectedResultForMerge(Seq("7", "8", "9"), 
version)
+        assert(new String(db.get("a")) === expectedValue3)
+        assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue3)))
 
         db.load(5)
-        assert(new String(db.get("a")) === "10,11")
-        assert(db.iterator().map(toStr).toSet === Set(("a", "10,11")))
+        val expectedValue4 = expectedResultForMerge(Seq("10", "11"), version)
+        assert(new String(db.get("a")) === expectedValue4)
+        assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue4)))
+      }
+    }
+  }
+
+  test("blind mergeList without putList against non-existence key with 
operator version 2") {
+    withSQLConf(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "2") 
{
+      withTempDir { dir =>
+        // minDeltasForSnapshot being 5 ensures that only changelog files are 
created
+        // for the 3 commits below
+        val conf = dbConf.copy(minDeltasForSnapshot = 5, compactOnCommit = 
false)
+        withDB(dir.getCanonicalPath, conf = conf, useColumnFamilies = true) { 
db =>
+          db.load(0)
+          // We don't putList ("a", "b") first here, we call mergeList against 
non-existence key
+          // Note that this is only safe with merge operator version 2, the 
same reason we
+          // described in the prior test.
+          db.mergeList("a", Seq("1", "2").map(_.getBytes).toList)
+          db.mergeList("a", Seq("3", "4").map(_.getBytes).toList)
+          db.commit()
+
+          db.load(1)
+          db.remove("a")
+          db.commit()
+
+          db.load(2)
+          db.mergeList("a", Seq("5").map(_.getBytes).toList)
+          db.mergeList("a", Seq("6", "7", "8").map(_.getBytes).toList)
+          db.commit()
+
+          db.load(1)
+          val expectedValue = "1234"
+          assert(new String(db.get("a")) === expectedValue)
+          assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue)))
+
+          db.load(2)
+          assert(db.get("a") === null)
+          assert(db.iterator().isEmpty)
+
+          db.load(3)
+          val expectedValue2 = "5678"
+          assert(new String(db.get("a")) === expectedValue2)
+          assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue2)))
+        }
+      }
+    }
+  }
+
+  test("merge operator version validation") {
+    // Validation happens at SQLConf level
+    val ex = intercept[SparkIllegalArgumentException] {
+      withSQLConf(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> 
"99") {
+        // This should fail before we get here
       }
     }
+    assert(ex.getMessage.contains("Must be 1 or 2"))
+    assert(ex.getMessage.contains("99"))
   }
 
   testWithStateStoreCheckpointIdsAndColumnFamilies("RocksDBFileManager: delete 
orphan files",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to