This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d16183d044a6 [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder d16183d044a6 is described below commit d16183d044a6207cb6e71cbaa1c942621fec5c12 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Wed Apr 3 16:54:23 2024 +0900 [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder ### What changes were proposed in this pull request? Add support for negative numeric types and range scan key encoder ### Why are the changes needed? Without this change, sort ordering for `-ve` numbers is not maintained on iteration. Negative numbers would appear last previously. Note that only non-floating integer types such as `short, integer, long` are supported for signed values. For float/double, we cannot simply prepend a sign byte given the way floating point values are stored in the IEEE 754 floating point representation. Additionally we also need to flip all the bits and convert them back to the original value on read, in [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests ``` [info] - rocksdb range scan - with prefix scan - with colFamiliesEnabled=true (with changelog checkpointing) (164 milliseconds) [info] - rocksdb range scan - with prefix scan - with colFamiliesEnabled=true (without changelog checkpointing) (95 milliseconds) [info] - rocksdb range scan - with prefix scan - with colFamiliesEnabled=false (with changelog checkpointing) (155 milliseconds) [info] - rocksdb range scan - with prefix scan - with colFamiliesEnabled=false (without changelog checkpointing) (82 milliseconds) 12:55:54.184 WARN org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreSuite: ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.RocksDBStateStoreSuite, threads: rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-2 (daemon=true), shuffle-boss-6-1 (daemon=true), ForkJoinPool.commonPool-worker-1 (daemon=true) ===== [info] Run completed in 8 seconds, 888 milliseconds. [info] Total number of tests run: 44 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 44, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 21 s, completed Mar 29, 2024, 12:55:54 PM ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #45778 from anishshri-db/task/SPARK-47653. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../streaming/state/RocksDBStateEncoder.scala | 116 ++++++++++++++++++--- .../sql/execution/streaming/state/StateStore.scala | 6 +- .../streaming/state/RocksDBStateStoreSuite.scala | 92 +++++++++++++--- 3 files changed, 182 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala index f342853514d8..06c3940af127 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming.state +import java.lang.Double.{doubleToRawLongBits, longBitsToDouble} +import java.lang.Float.{floatToRawIntBits, intBitsToFloat} import java.nio.{ByteBuffer, ByteOrder} import org.apache.spark.internal.Logging @@ -206,19 +208,25 @@ class PrefixKeyScanStateEncoder( * for the range scan into an UnsafeRow; we then rewrite that UnsafeRow's fields in BIG_ENDIAN * to allow for scanning keys in sorted order using the byte-wise comparison method that * RocksDB uses. + * * Then, for the rest of the fields, we project those into another UnsafeRow. * We then effectively join these two UnsafeRows together, and finally take those bytes * to get the resulting row. + * * We cannot support variable sized fields given the UnsafeRow format which stores variable * sized fields as offset and length pointers to the actual values, thereby changing the required * ordering. + * * Note that we also support "null" values being passed for these fixed size fields. We prepend * a single byte to indicate whether the column value is null or not. We cannot change the * nullability on the UnsafeRow itself as the expected ordering would change if non-first * columns are marked as null. If the first col is null, those entries will appear last in * the iterator. If non-first columns are null, ordering based on the previous columns will * still be honored. For rows with null column values, ordering for subsequent columns - * will also be maintained within those set of rows. + * will also be maintained within those set of rows. We use the same byte to also encode whether + * the value is negative or not. For negative float/double values, we flip all the bits to ensure + * the right lexicographical ordering. For the rationale around this, please check the link + * here: https://en.wikipedia.org/wiki/IEEE_754#Design_rationale * * @param keySchema - schema of the key to be encoded * @param numOrderingCols - number of columns to be used for range scan @@ -276,53 +284,114 @@ class RangeKeyScanStateEncoder( rangeScanKeyProjection(key) } + // bit masks used for checking sign or flipping all bits for negative float/double values + private val floatFlipBitMask = 0xFFFFFFFF + private val floatSignBitMask = 0x80000000 + + private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL + private val doubleSignBitMask = 0x8000000000000000L + + // Byte markers used to identify whether the value is null, negative or positive + // To ensure sorted ordering, we use the lowest byte value for negative numbers followed by + // positive numbers and then null values. + private val negativeValMarker: Byte = 0x00.toByte + private val positiveValMarker: Byte = 0x01.toByte + private val nullValMarker: Byte = 0x02.toByte + // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding // using byte arrays. // To handle "null" values, we prepend a byte to the byte array indicating whether the value - // is null or not. If the value is null, we write the null byte followed by a zero byte. + // is null or not. If the value is null, we write the null byte followed by zero bytes. // If the value is not null, we write the null byte followed by the value. // Note that setting null for the index on the unsafeRow is not feasible as it would change // the sorting order on iteration. + // Also note that the same byte is used to indicate whether the value is negative or not. private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = { val writer = new UnsafeRowWriter(numOrderingCols) writer.resetRowWriter() rangeScanKeyFieldsWithIdx.foreach { case (field, idx) => val value = row.get(idx, field.dataType) - val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte // Note that we cannot allocate a smaller buffer here even if the value is null // because the effective byte array is considered variable size and needs to have // the same size across all rows for the ordering to work as expected. val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1) bbuf.order(ByteOrder.BIG_ENDIAN) - bbuf.put(isNullCol) - if (isNullCol == 0x01.toByte) { + if (value == null) { + bbuf.put(nullValMarker) writer.write(idx, bbuf.array()) } else { field.dataType match { case BooleanType => case ByteType => + bbuf.put(positiveValMarker) bbuf.put(value.asInstanceOf[Byte]) writer.write(idx, bbuf.array()) - // for other multi-byte types, we need to convert to big-endian case ShortType => - bbuf.putShort(value.asInstanceOf[Short]) + val shortVal = value.asInstanceOf[Short] + val signCol = if (shortVal < 0) { + negativeValMarker + } else { + positiveValMarker + } + bbuf.put(signCol) + bbuf.putShort(shortVal) writer.write(idx, bbuf.array()) case IntegerType => - bbuf.putInt(value.asInstanceOf[Int]) + val intVal = value.asInstanceOf[Int] + val signCol = if (intVal < 0) { + negativeValMarker + } else { + positiveValMarker + } + bbuf.put(signCol) + bbuf.putInt(intVal) writer.write(idx, bbuf.array()) case LongType => - bbuf.putLong(value.asInstanceOf[Long]) + val longVal = value.asInstanceOf[Long] + val signCol = if (longVal < 0) { + negativeValMarker + } else { + positiveValMarker + } + bbuf.put(signCol) + bbuf.putLong(longVal) writer.write(idx, bbuf.array()) case FloatType => - bbuf.putFloat(value.asInstanceOf[Float]) + val floatVal = value.asInstanceOf[Float] + val rawBits = floatToRawIntBits(floatVal) + // perform sign comparison using bit manipulation to ensure NaN values are handled + // correctly + if ((rawBits & floatSignBitMask) != 0) { + // for negative values, we need to flip all the bits to ensure correct ordering + val updatedVal = rawBits ^ floatFlipBitMask + bbuf.put(negativeValMarker) + // convert the bits back to float + bbuf.putFloat(intBitsToFloat(updatedVal)) + } else { + bbuf.put(positiveValMarker) + bbuf.putFloat(floatVal) + } writer.write(idx, bbuf.array()) case DoubleType => - bbuf.putDouble(value.asInstanceOf[Double]) + val doubleVal = value.asInstanceOf[Double] + val rawBits = doubleToRawLongBits(doubleVal) + // perform sign comparison using bit manipulation to ensure NaN values are handled + // correctly + if ((rawBits & doubleSignBitMask) != 0) { + // for negative values, we need to flip all the bits to ensure correct ordering + val updatedVal = rawBits ^ doubleFlipBitMask + bbuf.put(negativeValMarker) + // convert the bits back to double + bbuf.putDouble(longBitsToDouble(updatedVal)) + } else { + bbuf.put(positiveValMarker) + bbuf.putDouble(doubleVal) + } writer.write(idx, bbuf.array()) } } @@ -336,6 +405,7 @@ class RangeKeyScanStateEncoder( // to determine if the value is null or not. If the value is null, we set the ordinal on // the UnsafeRow to null. If the value is not null, we read the rest of the bytes to get the // actual value. + // For negative float/double values, we need to flip all the bits back to get the original value. private def decodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = { val writer = new UnsafeRowWriter(numOrderingCols) writer.resetRowWriter() @@ -343,9 +413,9 @@ class RangeKeyScanStateEncoder( val value = row.getBinary(idx) val bbuf = ByteBuffer.wrap(value.asInstanceOf[Array[Byte]]) bbuf.order(ByteOrder.BIG_ENDIAN) - val isNullCol = bbuf.get() - if (isNullCol == 0x01.toByte) { - // set the column to null and skip reading the next byte + val isNullOrSignCol = bbuf.get() + if (isNullOrSignCol == nullValMarker) { + // set the column to null and skip reading the next byte(s) writer.setNullAt(idx) } else { field.dataType match { @@ -363,10 +433,24 @@ class RangeKeyScanStateEncoder( writer.write(idx, bbuf.getLong) case FloatType => - writer.write(idx, bbuf.getFloat) + if (isNullOrSignCol == negativeValMarker) { + // if the number is negative, get the raw binary bits for the float + // and flip the bits back + val updatedVal = floatToRawIntBits(bbuf.getFloat) ^ floatFlipBitMask + writer.write(idx, intBitsToFloat(updatedVal)) + } else { + writer.write(idx, bbuf.getFloat) + } case DoubleType => - writer.write(idx, bbuf.getDouble) + if (isNullOrSignCol == negativeValMarker) { + // if the number is negative, get the raw binary bits for the double + // and flip the bits back + val updatedVal = doubleToRawLongBits(bbuf.getDouble) ^ doubleFlipBitMask + writer.write(idx, longBitsToDouble(updatedVal)) + } else { + writer.write(idx, bbuf.getDouble) + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index dd97aa5b9afc..d3b3264b8e3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -303,9 +303,9 @@ case class PrefixKeyScanStateEncoderSpec( case class RangeKeyScanStateEncoderSpec( keySchema: StructType, - numColsPrefixKey: Int) extends KeyStateEncoderSpec { - if (numColsPrefixKey == 0 || numColsPrefixKey > keySchema.length) { - throw StateStoreErrors.incorrectNumOrderingColsForRangeScan(numColsPrefixKey.toString) + numOrderingCols: Int) extends KeyStateEncoderSpec { + if (numOrderingCols == 0 || numOrderingCols > keySchema.length) { + throw StateStoreErrors.incorrectNumOrderingColsForRangeScan(numOrderingCols.toString) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index 0b347e272a48..1e5f664c980c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -259,7 +259,8 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1)) } - val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L) + val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 1L, 2L, 8L, + -230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L) timerTimestamps.foreach { ts => // non-timestamp col is of fixed size val keyRow = dataToKeyRowWithRangeScan(ts, "a") @@ -277,7 +278,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid // test with a different set of power of 2 timestamps val store1 = provider.getStore(1) - val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L) + val timerTimestamps1 = Seq(-32L, -64L, -256L, 64L, 32L, 1024L, 4096L, 0L) timerTimestamps1.foreach { ts => // non-timestamp col is of fixed size val keyRow = dataToKeyRowWithRangeScan(ts, "a") @@ -294,6 +295,62 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } } + testWithColumnFamilies("rocksdb range scan - variable size non-ordering columns with " + + "double type values are supported", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + val testSchema: StructType = StructType( + Seq(StructField("key1", DoubleType, false), + StructField("key2", StringType, false))) + + val schemaProj = UnsafeProjection.create(Array[DataType](DoubleType, StringType)) + tryWithProviderResource(newStoreProvider(testSchema, + RangeKeyScanStateEncoderSpec(testSchema, 1), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + testSchema, valueSchema, + RangeKeyScanStateEncoderSpec(testSchema, 1)) + } + + // Verify that the sort ordering here is as follows: + // -NaN, -Infinity, -ve values, -0, 0, +0, +ve values, +Infinity, +NaN + val timerTimestamps: Seq[Double] = Seq(6894.32, 345.2795, -23.24, 24.466, + 7860.0, 4535.55, 423.42, -5350.355, 0.0, 0.001, 0.233, -53.255, -66.356, -244.452, + 96456466.3536677, 14421434453.43524562, Double.NaN, Double.PositiveInfinity, + Double.NegativeInfinity, -Double.NaN, +0.0, -0.0, + // A different representation of NaN than the Java constants + java.lang.Double.longBitsToDouble(0x7ff80abcdef54321L), + java.lang.Double.longBitsToDouble(0xfff80abcdef54321L)) + timerTimestamps.foreach { ts => + // non-timestamp col is of variable size + val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts, + UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)))) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + // We expect to find NaNs at the beginning and end of the sorted list + var nanIndexSet = Set(0, 1, timerTimestamps.size - 2, timerTimestamps.size - 1) + val result = store.iterator(cfName).zipWithIndex.map { case (kv, idx) => + val keyRow = kv.key + val key = (keyRow.getDouble(0), keyRow.getString(1)) + if (key._1.isNaN) { + assert(nanIndexSet.contains(idx)) + nanIndexSet -= idx + } + key._1 + }.toSeq + + assert(nanIndexSet.isEmpty) + assert(result.filter(!_.isNaN) === timerTimestamps.sorted.filter(!_.isNaN)) + store.commit() + } + } + testWithColumnFamilies("rocksdb range scan - variable size non-ordering columns", TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => @@ -308,7 +365,8 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1)) } - val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L) + val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L, + -24L, -999L, -2L, -61L, -9808344L, -1020L) timerTimestamps.foreach { ts => // non-timestamp col is of variable size val keyRow = dataToKeyRowWithRangeScan(ts, @@ -327,7 +385,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid // test with a different set of power of 2 timestamps val store1 = provider.getStore(1) - val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L) + val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L, -512L, -8192L, -16L) timerTimestamps1.foreach { ts => // non-timestamp col is of fixed size val keyRow = dataToKeyRowWithRangeScan(ts, @@ -368,7 +426,9 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), (4200L, 68), (90L, 2000), - (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210)) + (1L, 27), (1L, 394), (1L, 5), (3L, 980), + (-1L, 232), (-1L, 3455), (-6109L, 921455), (-9808344L, 1), (-1020L, 2), + (35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210), (-4344L, 2323), (-3122L, 323)) timerTimestamps.foreach { ts => // order by long col first and then by int col val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2, @@ -411,7 +471,8 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid val timerTimestamps = Seq((931L, 10), (null, 40), (452300L, 1), (4200L, 68), (90L, 2000), (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), - (6L, 90118), (9L, 95118), (6L, 87210), (null, 113), (null, 28)) + (6L, 90118), (9L, 95118), (6L, 87210), (null, 113), (null, 28), (null, -23), (null, -5534), + (-67450L, 2434), (-803L, 3422)) timerTimestamps.foreach { ts => // order by long col first and then by int col val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2, @@ -426,7 +487,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid val keyRow = kv.key keyRow.isNullAt(0) } - assert(nullRows.size === 3) + assert(nullRows.size === 5) // filter out the null rows and verify the rest val result: Seq[(Long, Int)] = store.iterator(cfName).filter { kv => @@ -440,7 +501,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid val timerTimestampsWithoutNulls = Seq((931L, 10), (452300L, 1), (4200L, 68), (90L, 2000), (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), - (6L, 90118), (9L, 95118), (6L, 87210)) + (6L, 90118), (9L, 95118), (6L, 87210), (-67450L, 2434), (-803L, 3422)) assert(result === timerTimestampsWithoutNulls.sorted) @@ -453,7 +514,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid keyRow.getInt(1) }.toSeq - assert(nullRowsWithOrder === Seq(28, 40, 113)) + assert(nullRowsWithOrder === Seq(-5534, -23, 28, 40, 113)) store.abort() @@ -464,7 +525,8 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid RangeKeyScanStateEncoderSpec(testSchema, 2)) } - val timerTimestamps1 = Seq((null, 3), (null, 1), (null, 32), (null, 113), (null, 40872), + val timerTimestamps1 = Seq((null, 3), (null, 1), (null, 32), + (null, 113), (null, 40872), (null, -675456), (null, -924), (null, -666), (null, 66)) timerTimestamps1.foreach { ts => // order by long col first and then by int col @@ -508,6 +570,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } val timerTimestamps = Seq((931L, 10), (40L, null), (452300L, 1), + (-133L, null), (-344555L, 2424), (-4342499L, null), (4200L, 68), (90L, 2000), (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210), (113L, null), (100L, null)) timerTimestamps.foreach { ts => @@ -524,7 +587,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid val keyRow = kv.key keyRow.isNullAt(1) } - assert(nullRows.size === 3) + assert(nullRows.size === 5) // the ordering based on first col which has non-null values should be preserved val result: Seq[(Long, Int)] = store.iterator(cfName) @@ -561,6 +624,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid val timerTimestamps: Seq[(Byte, Int)] = Seq((0x33, 10), (0x1A, 40), (0x1F, 1), (0x01, 68), (0x7F, 2000), (0x01, 27), (0x01, 394), (0x01, 5), (0x03, 980), (0x35, 2112), + (0x11, -190), (0x1A, -69), (0x01, -344245), (0x31, -901), (0x06, 90118), (0x09, 95118), (0x06, 87210)) timerTimestamps.foreach { ts => // order by byte col first and then by int col @@ -594,7 +658,9 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid RangeKeyScanStateEncoderSpec(valueSchema, 1)) } - val timerTimestamps = Seq(931, 8000, 452300, 4200, 90, 1, 2, 8, 3, 35, 6, 9, 5) + val timerTimestamps = Seq(931, 8000, 452300, 4200, + -3545, -343, 133, -90, -8014490, -79247, + 90, 1, 2, 8, 3, 35, 6, 9, 5, -233) timerTimestamps.foreach { ts => // non-timestamp col is of variable size val keyRow = dataToValueRow(ts) @@ -634,7 +700,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1)) } - val timerTimestamps = Seq(931L, 8000L, 1L) + val timerTimestamps = Seq(931L, -1331L, 8000L, 1L, -244L, -8350L, -55L) timerTimestamps.zipWithIndex.foreach { case (ts, idx) => (1 to idx + 1).foreach { keyVal => val keyRow = dataToKeyRowWithRangeScan(ts, keyVal.toString) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org