This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b2601be71659 [SPARK-55131][SS] Change the default merge operator
delimiter for RocksDB to empty string to concat without delimiter
b2601be71659 is described below
commit b2601be716598f8cc5326382f46b4d0c855c5220
Author: Jungtaek Lim <[email protected]>
AuthorDate: Fri Feb 6 21:54:12 2026 +0900
[SPARK-55131][SS] Change the default merge operator delimiter for RocksDB
to empty string to concat without delimiter
### What changes were proposed in this pull request?
This PR proposes to change the default delimiter for the merge operator of
RocksDB to an empty string, so that merge operation does not add a delimiter
and concat two without any character.
Changing the delimiter isn't compatible with existing checkpoints, so this
change is coupled with SQLConf, with known offset log metadata trick, to apply
the change only for new streaming queries.
* New SQL config:
`spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion`
* Default: 2 ('' as delimiter)
* Default for existing checkpoints: 1 (',' as delimiter)
### Why are the changes needed?
We found out there is no way to distinguish two cases of 1) put against
non-existence value then merge and 2) merge against non-existence value then
merge, from the current delimiter. There has been an "implication" that
operators do ensure they call merge only when they know the operation is
against existing key. This effectively requires GET operation which can be an
outstanding performance impact depending on the logic.
Making delimiter to an empty string (none) would eliminate the difference
between the two cases, allowing operators to perform blind merge without
checking the existence of the key.
### Does this PR introduce _any_ user-facing change?
No, the change is internal and there is no user-facing change.
### How was this patch tested?
Added UTs.
### Was this patch authored or co-authored using generative AI tooling?
Co-authored by claude-4.5-sonnet
Closes #54083 from HeartSaVioR/SPARK-55131.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 15 ++
.../streaming/checkpointing/OffsetSeq.scala | 6 +-
.../sql/execution/streaming/state/RocksDB.scala | 52 +++++--
.../streaming/state/RocksDBStateEncoder.scala | 11 +-
.../state/RocksDBStateStoreProvider.scala | 10 +-
.../streaming/state/StateStoreRowChecksum.scala | 20 ++-
.../execution/streaming/OffsetSeqLogSuite.scala | 41 +++++
.../streaming/state/RocksDBStateStoreSuite.scala | 110 +++++++++++++-
.../execution/streaming/state/RocksDBSuite.scala | 165 ++++++++++++++++++---
9 files changed, 375 insertions(+), 55 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7973c652a695..f17199547665 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3045,6 +3045,21 @@ object SQLConf {
// 5 is the default table format version for RocksDB 6.20.3.
.createWithDefault(5)
+ /**
+ * Note: this is defined in `RocksDBConf.MERGE_OPERATOR_VERSION_CONF`. These
two places should
+ * be updated together.
+ */
+ val STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION =
+ buildConf("spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion")
+ .internal()
+ .doc("Set the RocksDB merge operator version. This will be stored in the
checkpoint when " +
+ "starting a streaming query. The checkpoint will use this merge
operator version for " +
+ "the entire lifetime of the query.")
+ .version("4.2.0")
+ .intConf
+ .checkValue(v => v == 1 || v == 2, "Must be 1 or 2")
+ .createWithDefault(2)
+
val STREAMING_AGGREGATION_STATE_FORMAT_VERSION =
buildConf("spark.sql.streaming.aggregation.stateFormatVersion")
.internal()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
index c6245ba1b2a9..23c266e82891 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
@@ -200,7 +200,8 @@ object OffsetSeqMetadata extends Logging {
STATE_STORE_PROVIDER_CLASS, STREAMING_MULTIPLE_WATERMARK_POLICY,
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION,
STREAMING_AGGREGATION_STATE_FORMAT_VERSION,
STREAMING_JOIN_STATE_FORMAT_VERSION, STATE_STORE_COMPRESSION_CODEC,
- STATE_STORE_ROCKSDB_FORMAT_VERSION,
STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
+ STATE_STORE_ROCKSDB_FORMAT_VERSION,
STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION,
+ STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN,
STREAMING_STATE_STORE_ENCODING_FORMAT,
STATE_STORE_ROW_CHECKSUM_ENABLED
)
@@ -249,7 +250,8 @@ object OffsetSeqMetadata extends Logging {
STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION.key -> "false",
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key -> "true",
STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "unsaferow",
- STATE_STORE_ROW_CHECKSUM_ENABLED.key -> "false"
+ STATE_STORE_ROW_CHECKSUM_ENABLED.key -> "false",
+ STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "1"
)
def readValue[T](metadataLog: OffsetSeqMetadataBase, confKey:
ConfigEntry[T]): String = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 8d6ae2a180c4..69a7e9618bb3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -127,7 +127,20 @@ class RocksDB(
rocksDbOptions.setMaxOpenFiles(conf.maxOpenFiles)
rocksDbOptions.setAllowFAllocate(conf.allowFAllocate)
rocksDbOptions.setAvoidFlushDuringShutdown(true)
- rocksDbOptions.setMergeOperator(new StringAppendOperator())
+ // Set merge operator based on version for backward compatibility
+ // Version 1: comma delimiter ",", Version 2: empty string ""
+ // NOTE: RocksDB does not document about the behavior of an empty string
delimiter,
+ // only the PR description explains this -
https://github.com/facebook/rocksdb/pull/8536
+ val mergeDelimiter = conf.mergeOperatorVersion match {
+ case 1 => ","
+ case 2 => ""
+ case v => throw new IllegalArgumentException(
+ s"Invalid merge operator version: $v. Supported versions are 1 and 2")
+ }
+ rocksDbOptions.setMergeOperator(new StringAppendOperator(mergeDelimiter))
+
+ // Store delimiter size for use in encode/decode operations
+ private[state] val delimiterSize = mergeDelimiter.length
if (conf.boundedMemoryUsage) {
rocksDbOptions.setWriteBufferManager(writeBufferManager)
@@ -1041,7 +1054,7 @@ class RocksDB(
val (finalKey, value) = getValue(key, cfName)
if (conf.rowChecksumEnabled && value != null) {
KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
- readVerifier, finalKey, value)
+ readVerifier, finalKey, value, delimiterSize)
} else {
value
}
@@ -1076,7 +1089,7 @@ class RocksDB(
valuesList.asScala.iterator.zipWithIndex.map {
case (value, idx) if value != null =>
KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
- readVerifier, finalKeys(idx), value)
+ readVerifier, finalKeys(idx), value, delimiterSize)
case _ => null
}
} else {
@@ -1118,7 +1131,7 @@ class RocksDB(
val (finalKey, value) = getValue(key, cfName)
KeyValueChecksumEncoder.decodeAndVerifyMultiValueRowWithChecksum(
- readVerifier, finalKey, value)
+ readVerifier, finalKey, value, delimiterSize)
}
/** Returns a tuple of the final key used to store the value in the db and
the value. */
@@ -1234,12 +1247,12 @@ class RocksDB(
} else {
values
}
- // Delimit each value row bytes with a single byte delimiter, the last
+ // Delimit each value row bytes with delimiter, the last
// value row won't have a delimiter at the end.
val delimiterNum = valueWithChecksum.length - 1
// The bytes in valueWithChecksum already include the bytes length prefix
val totalSize = valueWithChecksum.map(_.length).sum +
- delimiterNum // for each delimiter
+ delimiterNum * delimiterSize // for each delimiter
val result = new Array[Byte](totalSize)
var pos = Platform.BYTE_ARRAY_OFFSET
@@ -1249,12 +1262,14 @@ class RocksDB(
Platform.copyMemory(rowBytes, Platform.BYTE_ARRAY_OFFSET, result, pos,
rowBytes.length)
pos += rowBytes.length
- // Add the delimiter - we are using "," as the delimiter
- if (idx < delimiterNum) {
- result(pos - Platform.BYTE_ARRAY_OFFSET) = 44.toByte
+ // Add the delimiter if not the last value and delimiter is not empty
+ if (idx < delimiterNum && delimiterSize > 0) {
+ mergeDelimiter.getBytes.zipWithIndex.foreach { case (b, i) =>
+ result(pos - Platform.BYTE_ARRAY_OFFSET + i) = b
+ }
}
// Move the position for delimiter
- pos += 1
+ pos += delimiterSize
}
result
}
@@ -1474,7 +1489,7 @@ class RocksDB(
val value = if (conf.rowChecksumEnabled) {
KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
- readVerifier, iter.key, iter.value)
+ readVerifier, iter.key, iter.value, delimiterSize)
} else {
iter.value
}
@@ -1572,7 +1587,7 @@ class RocksDB(
val value = if (conf.rowChecksumEnabled) {
KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
- readVerifier, iter.key, iter.value)
+ readVerifier, iter.key, iter.value, delimiterSize)
} else {
iter.value
}
@@ -2391,6 +2406,7 @@ case class RocksDBConf(
fileChecksumEnabled: Boolean,
rowChecksumEnabled: Boolean,
rowChecksumReadVerificationRatio: Long,
+ mergeOperatorVersion: Int,
stateStoreConf: StateStoreConf)
object RocksDBConf {
@@ -2498,6 +2514,17 @@ object RocksDBConf {
private val VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF =
SQLConfEntry(VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF_KEY, "true")
+ // Configuration to set the merge operator version for backward
compatibility.
+ // Version 1 (default): Uses comma "," as delimiter for StringAppendOperator
+ // Version 2: Uses empty string "" as delimiter (no delimiter, direct
concatenation)
+ //
+ // Note: this is also defined in
`SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION`.
+ // These two places should be updated together.
+ val MERGE_OPERATOR_VERSION_CONF_KEY = "mergeOperatorVersion"
+ private val MERGE_OPERATOR_VERSION_CONF =
SQLConfEntry(MERGE_OPERATOR_VERSION_CONF_KEY, "2")
+
+ val MERGE_OPERATOR_VALID_VERSIONS: Seq[Int] = Seq(1, 2)
+
def apply(storeConf: StateStoreConf): RocksDBConf = {
val sqlConfs = CaseInsensitiveMap[String](storeConf.sqlConfs)
val extraConfs = CaseInsensitiveMap[String](storeConf.extraOptions)
@@ -2594,6 +2621,7 @@ object RocksDBConf {
storeConf.checkpointFileChecksumEnabled,
storeConf.rowChecksumEnabled,
storeConf.rowChecksumReadVerificationRatio,
+ getPositiveIntConf(MERGE_OPERATOR_VERSION_CONF),
storeConf)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
index 102f38443b8b..4111e7b42623 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
@@ -1350,14 +1350,16 @@ object RocksDBStateEncoder extends Logging {
* @param valueSchema Schema defining the structure of values to be encoded
* @param useMultipleValuesPerKey If true, creates an encoder that can
handle multiple values
* per key; if false, creates an encoder for
single values
+ * @param delimiterSize Size of the delimiter used between multiple values
(in bytes)
* @return A configured RocksDBValueStateEncoder instance
*/
def getValueEncoder(
dataEncoder: RocksDBDataEncoder,
valueSchema: StructType,
- useMultipleValuesPerKey: Boolean): RocksDBValueStateEncoder = {
+ useMultipleValuesPerKey: Boolean,
+ delimiterSize: Int = 1): RocksDBValueStateEncoder = {
if (useMultipleValuesPerKey) {
- new MultiValuedStateEncoder(dataEncoder, valueSchema)
+ new MultiValuedStateEncoder(dataEncoder, valueSchema, delimiterSize)
} else {
new SingleValueStateEncoder(dataEncoder, valueSchema)
}
@@ -1726,7 +1728,8 @@ class NoPrefixKeyStateEncoder(
*/
class MultiValuedStateEncoder(
dataEncoder: RocksDBDataEncoder,
- valueSchema: StructType)
+ valueSchema: StructType,
+ delimiterSize: Int)
extends RocksDBValueStateEncoder with Logging {
override def encodeValue(row: UnsafeRow): Array[Byte] = {
@@ -1786,7 +1789,7 @@ class MultiValuedStateEncoder(
numBytes
)
pos += numBytes
- pos += 1 // eat the delimiter character
+ pos += delimiterSize // eat the delimiter based on actual delimiter
size
dataEncoder.decodeValue(encodedValue)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 7494c12d028f..6dde67dd5b66 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -237,7 +237,8 @@ private[sql] class RocksDBStateStoreProvider
val valueEncoder = RocksDBStateEncoder.getValueEncoder(
dataEncoder,
valueSchema,
- useMultipleValuesPerKey
+ useMultipleValuesPerKey,
+ rocksDB.delimiterSize
)
keyValueEncoderMap.putIfAbsent(colFamilyName, (keyEncoder, valueEncoder,
cfId))
}
@@ -733,7 +734,8 @@ private[sql] class RocksDBStateStoreProvider
val valueEncoder = RocksDBStateEncoder.getValueEncoder(
dataEncoder,
valueSchema,
- useMultipleValuesPerKey
+ useMultipleValuesPerKey,
+ rocksDB.delimiterSize
)
var cfId: Short = 0
@@ -1443,7 +1445,7 @@ class RocksDBStateStoreChangeDataReader(
val valueBytes = if (storeConf.rowChecksumEnabled &&
nextRecord._1 != RecordType.DELETE_RECORD) {
KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
- readVerifier, keyBytes, nextRecord._3)
+ readVerifier, keyBytes, nextRecord._3, rocksDB.delimiterSize)
} else {
nextRecord._3
}
@@ -1466,7 +1468,7 @@ class RocksDBStateStoreChangeDataReader(
(nextRecord._1, key, nextRecord._3)
case _ =>
val value =
KeyValueChecksumEncoder.decodeAndVerifyValueRowWithChecksum(
- readVerifier, nextRecord._2, nextRecord._3)
+ readVerifier, nextRecord._2, nextRecord._3,
rocksDB.delimiterSize)
(nextRecord._1, nextRecord._2, value)
}
} else {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRowChecksum.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRowChecksum.scala
index f02a598600c3..88f2e407bccf 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRowChecksum.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRowChecksum.scala
@@ -260,12 +260,14 @@ object KeyValueChecksumEncoder {
* @param verifier used for checksum verification.
* @param keyBytes Key bytes for the value to decode, only used for checksum
verification.
* @param valueBytes The value bytes to decode.
+ * @param delimiterSize Size of delimiter used between merged values (in
bytes).
* @return The original value row bytes, without the checksum.
* */
def decodeAndVerifyValueRowWithChecksum(
verifier: Option[KeyValueIntegrityVerifier],
keyBytes: Array[Byte],
- valueBytes: Array[Byte]): Array[Byte] = {
+ valueBytes: Array[Byte],
+ delimiterSize: Int): Array[Byte] = {
// First get the total size of the original values
// Doing this to also support decoding merged values (via merge) e.g.
val1,val2,val3
val valuesEnd = Platform.BYTE_ARRAY_OFFSET + valueBytes.length
@@ -276,14 +278,14 @@ object KeyValueChecksumEncoder {
// skip the checksum (first 4 bytes)
currentPosition += java.lang.Integer.BYTES
val valueRowSize = Platform.getInt(valueBytes, currentPosition)
- // move to the next value and skip the delimiter character used for
rocksdb merge
- currentPosition += java.lang.Integer.BYTES + valueRowSize + 1
+ // move to the next value and skip the delimiter bytes used for rocksdb
merge
+ currentPosition += java.lang.Integer.BYTES + valueRowSize + delimiterSize
resultSize += valueRowSize
numValues += 1
}
// include the number of delimiters used for merge
- resultSize += numValues - 1
+ resultSize += (numValues - 1) * delimiterSize
// now verify and decode to original merged values
val result = new Array[Byte](resultSize)
@@ -308,7 +310,7 @@ object KeyValueChecksumEncoder {
// No delimiter is needed if single value or the last value in
multi-value
val copyLength = if (currentValueCount < numValues) {
- valueRowSize + 1 // copy the delimiter
+ valueRowSize + delimiterSize // copy the delimiter
} else {
valueRowSize
}
@@ -337,12 +339,14 @@ object KeyValueChecksumEncoder {
* @param verifier Used for checksum verification.
* @param keyBytes Key bytes for the value to decode, only used for checksum
verification.
* @param valueBytes The value bytes to decode.
+ * @param delimiterSize Size of delimiter used between merged values (in
bytes).
* @return Iterator of index range representing the original value row
bytes, without checksum.
*/
def decodeAndVerifyMultiValueRowWithChecksum(
verifier: Option[KeyValueIntegrityVerifier],
keyBytes: Array[Byte],
- valueBytes: Array[Byte]): Iterator[ArrayIndexRange[Byte]] = {
+ valueBytes: Array[Byte],
+ delimiterSize: Int): Iterator[ArrayIndexRange[Byte]] = {
if (valueBytes == null) {
Seq().iterator
} else {
@@ -357,8 +361,8 @@ object KeyValueChecksumEncoder {
val (valueRowIndex, checksum) =
getValueRowIndexAndChecksum(valueBytes, startingPosition =
position)
verifier.foreach(_.verify(keyRowIndex, Some(valueRowIndex),
checksum))
- // move to the next value and skip the delimiter character used for
rocksdb merge
- position = byteIndexToOffset(valueRowIndex.untilIndex) + 1
+ // move to the next value and skip the delimiter bytes used for
rocksdb merge
+ position = byteIndexToOffset(valueRowIndex.untilIndex) +
delimiterSize
valueRowIndex
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index 32082cb62bf0..36bf5e2f8313 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -368,6 +368,47 @@ class OffsetSeqLogSuite extends SharedSparkSession {
}
}
+ test("SPARK-55131: offset log records defaults to merge operator version 2")
{
+ val offsetSeqMetadata = OffsetSeqMetadata.apply(batchWatermarkMs = 0,
batchTimestampMs = 0,
+ spark.conf)
+
assert(offsetSeqMetadata.conf.get(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key)
===
+ Some("2"))
+ }
+
+ test("SPARK-55131: offset log uses the merge operator version set in the
conf") {
+ val offsetSeqMetadata = OffsetSeqMetadata.apply(batchWatermarkMs = 0,
batchTimestampMs = 0,
+ // Trying to set it to non-default value, 1
+ Map(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "1"))
+
assert(offsetSeqMetadata.conf.get(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key)
===
+ Some("1"))
+ }
+
+ test("SPARK-55131: Backward compatibility test with merge operator version")
{
+ // Read from the checkpoint which does not have an entry for merge
operator version
+ // in its offset log. This should pick up the value to 1 instead of 2.
+ withTempDir { checkpointDir =>
+ val resourceUri = this.getClass.getResource(
+ "/structured-streaming/checkpoint-version-4.0.0-tws-unsaferow/").toURI
+ Utils.copyDirectory(new File(resourceUri),
checkpointDir.getCanonicalFile)
+
+ val log = new OffsetSeqLog(spark, s"$checkpointDir/offsets")
+ val latestBatchId = log.getLatestBatchId()
+ assert(latestBatchId.isDefined, "No offset log entries found in the
checkpoint location")
+
+ // Read the latest offset log
+ val offsetSeq = log.get(latestBatchId.get).get
+ val offsetSeqMetadata = offsetSeq.metadataOpt.get
+
+ assert(!offsetSeqMetadata.conf
+ .contains(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key),
+ "Merge operator version should be absent in the offset log entry")
+
+ val clonedSqlConf = spark.sessionState.conf.clone()
+ OffsetSeqMetadata.setSessionConf(offsetSeqMetadata, clonedSqlConf)
+
assert(clonedSqlConf.getConf(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION)
== 1)
+ }
+ }
+
def testWithOffsetV2(
testName: String, testTags: Tag*)(testBody: => Any): Unit = {
super.test(testName, testTags: _*) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 8832d7e09933..3c200b860f52 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -1730,7 +1730,18 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
}
- test("validate rocksdb values iterator correctness") {
+ private def testMergeWithOperatorVersions(testName: String)(testFn: Int =>
Unit): Unit = {
+ RocksDBConf.MERGE_OPERATOR_VALID_VERSIONS.foreach { version =>
+ test(testName + s" - merge operator version $version") {
+ withSQLConf(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key ->
version.toString) {
+ testFn(version)
+ }
+ }
+ }
+ }
+
+ testMergeWithOperatorVersions(
+ "validate rocksdb values iterator correctness - put then merge") { _ =>
withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1") {
tryWithProviderResource(newStoreProvider(useColumnFamilies = true,
useMultipleValuesPerKey = true)) { provider =>
@@ -1766,6 +1777,103 @@ class RocksDBStateStoreSuite extends
StateStoreSuiteBase[RocksDBStateStoreProvid
}
}
+ test("validate rocksdb values iterator correctness - blind merge with
operator version 2") {
+ withSQLConf(
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "2") {
+
+ tryWithProviderResource(newStoreProvider(useColumnFamilies = true,
+ useMultipleValuesPerKey = true)) { provider =>
+ val store = provider.getStore(0)
+ // We do blind merge than put against non-existing key.
+ // Note that this is only safe with merge operator version 2.
+ merge(store, "a", 0, 1)
+
+ val iterator0 = store.valuesIterator(dataToKeyRow("a", 0))
+
+ assert(iterator0.hasNext)
+ assert(valueRowToData(iterator0.next()) === 1)
+ assert(!iterator0.hasNext)
+
+ merge(store, "a", 0, 2)
+ merge(store, "a", 0, 3)
+
+ val iterator1 = store.valuesIterator(dataToKeyRow("a", 0))
+
+ (1 to 3).map { i =>
+ assert(iterator1.hasNext)
+ assert(valueRowToData(iterator1.next()) === i)
+ }
+
+ assert(!iterator1.hasNext)
+
+ store.abort()
+ }
+ }
+ }
+
+ testMergeWithOperatorVersions(
+ "validate rocksdb values iterator correctness - fuzzy merge and put"
+ ) { version =>
+ val seed = System.currentTimeMillis()
+ val rand = new Random(seed)
+ logInfo(s"fuzzy merge and put test using seed: $seed")
+
+ withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1") {
+ tryWithProviderResource(newStoreProvider(useColumnFamilies = true,
+ useMultipleValuesPerKey = true)) { provider =>
+ val store = provider.getStore(0)
+
+ try {
+ val inputValues = scala.collection.mutable.ArrayBuffer[Int]()
+
+ def performPut(value: Int): Unit = {
+ put(store, "a", 0, value)
+ inputValues.clear()
+ inputValues += value.toInt
+ }
+
+ def performMerge(value: Int): Unit = {
+ merge(store, "a", 0, value)
+ inputValues += value.toInt
+ }
+
+ def performRemove(): Unit = {
+ remove(store, _._1 == "a")
+ inputValues.clear()
+ }
+
+ (0 to 100000).foreach { _ =>
+ val op = rand.nextInt(3)
+
+ op match {
+ case 0 =>
+ val value = rand.nextInt(10)
+ performPut(value)
+ case 1 =>
+ val value = rand.nextInt(10)
+ if (inputValues.isEmpty && version == 1) {
+ // version 1 can't handle blind merge against non-existing
key, so we have to
+ // fall back to put
+ performPut(value)
+ } else {
+ performMerge(value)
+ }
+ case 2 =>
+ performRemove()
+ }
+
+ val iterator = store.valuesIterator(dataToKeyRow("a", 0))
+ val valuesInStateStore = iterator.map(valueRowToData).toSeq
+ assert(valuesInStateStore === inputValues)
+ }
+ } finally {
+ store.abort()
+ }
+ }
+ }
+ }
+
/* Column family related tests */
testWithColumnFamiliesAndEncodingTypes("column family creation with invalid
names",
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled
=>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 7aced88d3089..bb02dad76ac6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -35,7 +35,7 @@ import org.scalactic.source.Position
import org.scalatest.PrivateMethodTester
import org.scalatest.Tag
-import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TaskContext}
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite,
SparkIllegalArgumentException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.catalyst.InternalRow
@@ -1502,7 +1502,8 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
// Verify merge operation worked
db.load(0)
db.load(5)
- assert(toStr(db.get("merge_key", StateStore.DEFAULT_COL_FAMILY_NAME))
=== "base,appended")
+ // SPARK-55131: new merge operation concatenates the strings without
any separator
+ assert(toStr(db.get("merge_key", StateStore.DEFAULT_COL_FAMILY_NAME))
=== "baseappended")
}
}
}
@@ -2178,14 +2179,29 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
}
}
- test("RocksDB: ensure merge operation correctness") {
+ private def testMergeWithOperatorVersions(testName: String)(testFn: Int =>
Unit): Unit = {
+ RocksDBConf.MERGE_OPERATOR_VALID_VERSIONS.foreach { version =>
+ test(testName + s" - merge operator version $version") {
+ withSQLConf(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key ->
version.toString) {
+ testFn(version)
+ }
+ }
+ }
+ }
+
+ private def expectedResultForMerge(inputs: Seq[String],
mergeOperatorVersion: Int): String = {
+ mergeOperatorVersion match {
+ case 1 => inputs.mkString(",")
+ case 2 => inputs.mkString("")
+ }
+ }
+
+ testMergeWithOperatorVersions("put then merge") { version =>
withTempDir { dir =>
- val remoteDir = Utils.createTempDir().toString
// minDeltasForSnapshot being 5 ensures that only changelog files are
created
// for the 3 commits below
val conf = dbConf.copy(minDeltasForSnapshot = 5, compactOnCommit = false)
- new File(remoteDir).delete() // to make sure that the directory gets
created
- withDB(remoteDir, conf = conf, useColumnFamilies = true) { db =>
+ withDB(dir.getCanonicalPath, conf = conf, useColumnFamilies = true) { db
=>
db.load(0)
db.put("a", "1")
db.merge("a", "2")
@@ -2200,12 +2216,14 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
db.commit()
db.load(1)
- assert(new String(db.get("a")) === "1,2")
- assert(db.iterator().map(toStr).toSet === Set(("a", "1,2")))
+ val expectedValue = expectedResultForMerge(Seq("1", "2"), version)
+ assert(new String(db.get("a")) === expectedValue)
+ assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue)))
db.load(2)
- assert(new String(db.get("a")) === "1,2,3")
- assert(db.iterator().map(toStr).toSet === Set(("a", "1,2,3")))
+ val expectedValue2 = expectedResultForMerge(Seq("1", "2", "3"),
version)
+ assert(new String(db.get("a")) === expectedValue2)
+ assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue2)))
db.load(3)
assert(db.get("a") === null)
@@ -2214,17 +2232,59 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
}
}
- test("RocksDB: ensure putList / mergeList operation correctness") {
+ test("blind merge without put against non-existence key with operator
version 2") {
+ withSQLConf(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "2")
{
+ withTempDir { dir =>
+ // minDeltasForSnapshot being 5 ensures that only changelog files are
created
+ // for the 3 commits below
+ val conf = dbConf.copy(minDeltasForSnapshot = 5, compactOnCommit =
false)
+ withDB(dir.getCanonicalPath, conf = conf, useColumnFamilies = true) {
db =>
+ db.load(0)
+ // We don't put "a" first here, we call merge against non-existence
key
+ // Note that this is only safe with merge operator version 2 - in
version 1, reader side
+ // can't distinguish the case where the first byte starts with the
size of element, or
+ // delimiter. Merge operator version 2 concatenates the values
directly without delimiter
+ // so that the reader side does not need to distinguish the two
cases.
+ db.merge("a", "1")
+ db.merge("a", "2")
+ db.commit()
+
+ db.load(1)
+ db.remove("a")
+ db.commit()
+
+ db.load(2)
+ db.merge("a", "3")
+ db.merge("a", "4")
+ db.commit()
+
+ db.load(1)
+ val expectedValue = "12"
+ assert(new String(db.get("a")) === expectedValue)
+ assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue)))
+
+ db.load(2)
+ assert(db.get("a") === null)
+ assert(db.iterator().isEmpty)
+
+ db.load(3)
+ val expectedValue2 = "34"
+ assert(new String(db.get("a")) === expectedValue2)
+ assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue2)))
+ }
+ }
+ }
+ }
+
+ testMergeWithOperatorVersions("putList then mergeList") { version =>
withTempDir { dir =>
- val remoteDir = Utils.createTempDir().toString
// minDeltasForSnapshot being 5 ensures that only changelog files are
created
// for the 3 commits below
val conf = dbConf.copy(minDeltasForSnapshot = 5, compactOnCommit = false)
- new File(remoteDir).delete() // to make sure that the directory gets
created
- withDB(remoteDir, conf = conf, useColumnFamilies = true) { db =>
+ withDB(dir.getCanonicalPath, conf = conf, useColumnFamilies = true) { db
=>
db.load(0)
- db.put("a", "1".getBytes)
- db.mergeList("a", Seq("2", "3", "4").map(_.getBytes).toList)
+ db.putList("a", Seq("1", "2").map(_.getBytes).toList)
+ db.mergeList("a", Seq("3", "4").map(_.getBytes).toList)
db.commit()
db.load(1)
@@ -2244,26 +2304,83 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
db.commit()
db.load(1)
- assert(new String(db.get("a")) === "1,2,3,4")
- assert(db.iterator().map(toStr).toSet === Set(("a", "1,2,3,4")))
+ val expectedValue = expectedResultForMerge(Seq("1", "2", "3", "4"),
version)
+ assert(new String(db.get("a")) === expectedValue)
+ assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue)))
db.load(2)
- assert(new String(db.get("a")) === "1,2,3,4,5,6")
- assert(db.iterator().map(toStr).toSet === Set(("a", "1,2,3,4,5,6")))
+ val expectedValue2 = expectedResultForMerge(Seq("1", "2", "3", "4",
"5", "6"), version)
+ assert(new String(db.get("a")) === expectedValue2)
+ assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue2)))
db.load(3)
assert(db.get("a") === null)
assert(db.iterator().isEmpty)
db.load(4)
- assert(new String(db.get("a")) === "7,8,9")
- assert(db.iterator().map(toStr).toSet === Set(("a", "7,8,9")))
+ val expectedValue3 = expectedResultForMerge(Seq("7", "8", "9"),
version)
+ assert(new String(db.get("a")) === expectedValue3)
+ assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue3)))
db.load(5)
- assert(new String(db.get("a")) === "10,11")
- assert(db.iterator().map(toStr).toSet === Set(("a", "10,11")))
+ val expectedValue4 = expectedResultForMerge(Seq("10", "11"), version)
+ assert(new String(db.get("a")) === expectedValue4)
+ assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue4)))
+ }
+ }
+ }
+
+ test("blind mergeList without putList against non-existence key with
operator version 2") {
+ withSQLConf(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "2")
{
+ withTempDir { dir =>
+ // minDeltasForSnapshot being 5 ensures that only changelog files are
created
+ // for the 3 commits below
+ val conf = dbConf.copy(minDeltasForSnapshot = 5, compactOnCommit =
false)
+ withDB(dir.getCanonicalPath, conf = conf, useColumnFamilies = true) {
db =>
+ db.load(0)
+ // We don't putList ("a", "b") first here, we call mergeList against
non-existence key
+ // Note that this is only safe with merge operator version 2, the
same reason we
+ // described in the prior test.
+ db.mergeList("a", Seq("1", "2").map(_.getBytes).toList)
+ db.mergeList("a", Seq("3", "4").map(_.getBytes).toList)
+ db.commit()
+
+ db.load(1)
+ db.remove("a")
+ db.commit()
+
+ db.load(2)
+ db.mergeList("a", Seq("5").map(_.getBytes).toList)
+ db.mergeList("a", Seq("6", "7", "8").map(_.getBytes).toList)
+ db.commit()
+
+ db.load(1)
+ val expectedValue = "1234"
+ assert(new String(db.get("a")) === expectedValue)
+ assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue)))
+
+ db.load(2)
+ assert(db.get("a") === null)
+ assert(db.iterator().isEmpty)
+
+ db.load(3)
+ val expectedValue2 = "5678"
+ assert(new String(db.get("a")) === expectedValue2)
+ assert(db.iterator().map(toStr).toSet === Set(("a", expectedValue2)))
+ }
+ }
+ }
+ }
+
+ test("merge operator version validation") {
+ // Validation happens at SQLConf level
+ val ex = intercept[SparkIllegalArgumentException] {
+ withSQLConf(SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key ->
"99") {
+ // This should fail before we get here
}
}
+ assert(ex.getMessage.contains("Must be 1 or 2"))
+ assert(ex.getMessage.contains("99"))
}
testWithStateStoreCheckpointIdsAndColumnFamilies("RocksDBFileManager: delete
orphan files",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]