This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d16183d044a6 [SPARK-47653][SS] Add support for negative numeric types 
and range scan key encoder
d16183d044a6 is described below

commit d16183d044a6207cb6e71cbaa1c942621fec5c12
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Wed Apr 3 16:54:23 2024 +0900

    [SPARK-47653][SS] Add support for negative numeric types and range scan key 
encoder
    
    ### What changes were proposed in this pull request?
    Add support for negative numeric types and range scan key encoder
    
    ### Why are the changes needed?
    Without this change, sort ordering for `-ve` numbers is not maintained on 
iteration. Negative numbers would appear last previously. Note that only 
non-floating integer types such as `short, integer, long` are supported for 
signed values. For float/double, we cannot simply prepend a sign byte given the 
way floating point values are stored in the IEEE 754 floating point 
representation. Additionally we also need to flip all the bits and convert them 
back to the original value on read, in [...]
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit tests
    
    ```
    [info] - rocksdb range scan - with prefix scan - with 
colFamiliesEnabled=true (with changelog checkpointing) (164 milliseconds)
    [info] - rocksdb range scan - with prefix scan - with 
colFamiliesEnabled=true (without changelog checkpointing) (95 milliseconds)
    [info] - rocksdb range scan - with prefix scan - with 
colFamiliesEnabled=false (with changelog checkpointing) (155 milliseconds)
    [info] - rocksdb range scan - with prefix scan - with 
colFamiliesEnabled=false (without changelog checkpointing) (82 milliseconds)
    12:55:54.184 WARN 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreSuite:
    
    ===== POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.execution.streaming.state.RocksDBStateStoreSuite, threads: 
rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-2 (daemon=true), 
shuffle-boss-6-1 (daemon=true), ForkJoinPool.commonPool-worker-1 (daemon=true) 
=====
    [info] Run completed in 8 seconds, 888 milliseconds.
    [info] Total number of tests run: 44
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 44, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    [success] Total time: 21 s, completed Mar 29, 2024, 12:55:54 PM
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45778 from anishshri-db/task/SPARK-47653.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../streaming/state/RocksDBStateEncoder.scala      | 116 ++++++++++++++++++---
 .../sql/execution/streaming/state/StateStore.scala |   6 +-
 .../streaming/state/RocksDBStateStoreSuite.scala   |  92 +++++++++++++---
 3 files changed, 182 insertions(+), 32 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
index f342853514d8..06c3940af127 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
+import java.lang.Double.{doubleToRawLongBits, longBitsToDouble}
+import java.lang.Float.{floatToRawIntBits, intBitsToFloat}
 import java.nio.{ByteBuffer, ByteOrder}
 
 import org.apache.spark.internal.Logging
@@ -206,19 +208,25 @@ class PrefixKeyScanStateEncoder(
  * for the range scan into an UnsafeRow; we then rewrite that UnsafeRow's 
fields in BIG_ENDIAN
  * to allow for scanning keys in sorted order using the byte-wise comparison 
method that
  * RocksDB uses.
+ *
  * Then, for the rest of the fields, we project those into another UnsafeRow.
  * We then effectively join these two UnsafeRows together, and finally take 
those bytes
  * to get the resulting row.
+ *
  * We cannot support variable sized fields given the UnsafeRow format which 
stores variable
  * sized fields as offset and length pointers to the actual values, thereby 
changing the required
  * ordering.
+ *
  * Note that we also support "null" values being passed for these fixed size 
fields. We prepend
  * a single byte to indicate whether the column value is null or not. We 
cannot change the
  * nullability on the UnsafeRow itself as the expected ordering would change 
if non-first
  * columns are marked as null. If the first col is null, those entries will 
appear last in
  * the iterator. If non-first columns are null, ordering based on the previous 
columns will
  * still be honored. For rows with null column values, ordering for subsequent 
columns
- * will also be maintained within those set of rows.
+ * will also be maintained within those set of rows. We use the same byte to 
also encode whether
+ * the value is negative or not. For negative float/double values, we flip all 
the bits to ensure
+ * the right lexicographical ordering. For the rationale around this, please 
check the link
+ * here: https://en.wikipedia.org/wiki/IEEE_754#Design_rationale
  *
  * @param keySchema - schema of the key to be encoded
  * @param numOrderingCols - number of columns to be used for range scan
@@ -276,53 +284,114 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative 
float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or 
positive
+  // To ensure sorted ordering, we use the lowest byte value for negative 
numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN 
encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating 
whether the value
-  // is null or not. If the value is null, we write the null byte followed by 
a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by 
zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as 
it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is 
negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
       // Note that we cannot allocate a smaller buffer here even if the value 
is null
       // because the effective byte array is considered variable size and 
needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (value == null) {
+        bbuf.put(nullValMarker)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(positiveValMarker)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
-            bbuf.putShort(value.asInstanceOf[Short])
+            val shortVal = value.asInstanceOf[Short]
+            val signCol = if (shortVal < 0) {
+              negativeValMarker
+            } else {
+              positiveValMarker
+            }
+            bbuf.put(signCol)
+            bbuf.putShort(shortVal)
             writer.write(idx, bbuf.array())
 
           case IntegerType =>
-            bbuf.putInt(value.asInstanceOf[Int])
+            val intVal = value.asInstanceOf[Int]
+            val signCol = if (intVal < 0) {
+              negativeValMarker
+            } else {
+              positiveValMarker
+            }
+            bbuf.put(signCol)
+            bbuf.putInt(intVal)
             writer.write(idx, bbuf.array())
 
           case LongType =>
-            bbuf.putLong(value.asInstanceOf[Long])
+            val longVal = value.asInstanceOf[Long]
+            val signCol = if (longVal < 0) {
+              negativeValMarker
+            } else {
+              positiveValMarker
+            }
+            bbuf.put(signCol)
+            bbuf.putLong(longVal)
             writer.write(idx, bbuf.array())
 
           case FloatType =>
-            bbuf.putFloat(value.asInstanceOf[Float])
+            val floatVal = value.asInstanceOf[Float]
+            val rawBits = floatToRawIntBits(floatVal)
+            // perform sign comparison using bit manipulation to ensure NaN 
values are handled
+            // correctly
+            if ((rawBits & floatSignBitMask) != 0) {
+              // for negative values, we need to flip all the bits to ensure 
correct ordering
+              val updatedVal = rawBits ^ floatFlipBitMask
+              bbuf.put(negativeValMarker)
+              // convert the bits back to float
+              bbuf.putFloat(intBitsToFloat(updatedVal))
+            } else {
+              bbuf.put(positiveValMarker)
+              bbuf.putFloat(floatVal)
+            }
             writer.write(idx, bbuf.array())
 
           case DoubleType =>
-            bbuf.putDouble(value.asInstanceOf[Double])
+            val doubleVal = value.asInstanceOf[Double]
+            val rawBits = doubleToRawLongBits(doubleVal)
+            // perform sign comparison using bit manipulation to ensure NaN 
values are handled
+            // correctly
+            if ((rawBits & doubleSignBitMask) != 0) {
+              // for negative values, we need to flip all the bits to ensure 
correct ordering
+              val updatedVal = rawBits ^ doubleFlipBitMask
+              bbuf.put(negativeValMarker)
+              // convert the bits back to double
+              bbuf.putDouble(longBitsToDouble(updatedVal))
+            } else {
+              bbuf.put(positiveValMarker)
+              bbuf.putDouble(doubleVal)
+            }
             writer.write(idx, bbuf.array())
         }
       }
@@ -336,6 +405,7 @@ class RangeKeyScanStateEncoder(
   // to determine if the value is null or not. If the value is null, we set 
the ordinal on
   // the UnsafeRow to null. If the value is not null, we read the rest of the 
bytes to get the
   // actual value.
+  // For negative float/double values, we need to flip all the bits back to 
get the original value.
   private def decodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
@@ -343,9 +413,9 @@ class RangeKeyScanStateEncoder(
       val value = row.getBinary(idx)
       val bbuf = ByteBuffer.wrap(value.asInstanceOf[Array[Byte]])
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      val isNullCol = bbuf.get()
-      if (isNullCol == 0x01.toByte) {
-        // set the column to null and skip reading the next byte
+      val isNullOrSignCol = bbuf.get()
+      if (isNullOrSignCol == nullValMarker) {
+        // set the column to null and skip reading the next byte(s)
         writer.setNullAt(idx)
       } else {
         field.dataType match {
@@ -363,10 +433,24 @@ class RangeKeyScanStateEncoder(
             writer.write(idx, bbuf.getLong)
 
           case FloatType =>
-            writer.write(idx, bbuf.getFloat)
+            if (isNullOrSignCol == negativeValMarker) {
+              // if the number is negative, get the raw binary bits for the 
float
+              // and flip the bits back
+              val updatedVal = floatToRawIntBits(bbuf.getFloat) ^ 
floatFlipBitMask
+              writer.write(idx, intBitsToFloat(updatedVal))
+            } else {
+              writer.write(idx, bbuf.getFloat)
+            }
 
           case DoubleType =>
-            writer.write(idx, bbuf.getDouble)
+            if (isNullOrSignCol == negativeValMarker) {
+              // if the number is negative, get the raw binary bits for the 
double
+              // and flip the bits back
+              val updatedVal = doubleToRawLongBits(bbuf.getDouble) ^ 
doubleFlipBitMask
+              writer.write(idx, longBitsToDouble(updatedVal))
+            } else {
+              writer.write(idx, bbuf.getDouble)
+            }
         }
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index dd97aa5b9afc..d3b3264b8e3d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -303,9 +303,9 @@ case class PrefixKeyScanStateEncoderSpec(
 
 case class RangeKeyScanStateEncoderSpec(
     keySchema: StructType,
-    numColsPrefixKey: Int) extends KeyStateEncoderSpec {
-  if (numColsPrefixKey == 0 || numColsPrefixKey > keySchema.length) {
-    throw 
StateStoreErrors.incorrectNumOrderingColsForRangeScan(numColsPrefixKey.toString)
+    numOrderingCols: Int) extends KeyStateEncoderSpec {
+  if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+    throw 
StateStoreErrors.incorrectNumOrderingColsForRangeScan(numOrderingCols.toString)
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 0b347e272a48..1e5f664c980c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -259,7 +259,8 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
           RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
       }
 
-      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 
3L, 35L, 6L, 9L, 5L)
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 1L, 2L, 
8L,
+        -230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L)
       timerTimestamps.foreach { ts =>
         // non-timestamp col is of fixed size
         val keyRow = dataToKeyRowWithRangeScan(ts, "a")
@@ -277,7 +278,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
 
       // test with a different set of power of 2 timestamps
       val store1 = provider.getStore(1)
-      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L)
+      val timerTimestamps1 = Seq(-32L, -64L, -256L, 64L, 32L, 1024L, 4096L, 0L)
       timerTimestamps1.foreach { ts =>
         // non-timestamp col is of fixed size
         val keyRow = dataToKeyRowWithRangeScan(ts, "a")
@@ -294,6 +295,62 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering 
columns with " +
+    "double type values are supported",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", DoubleType, false),
+        StructField("key2", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](DoubleType, 
StringType))
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 1), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 1))
+      }
+
+      // Verify that the sort ordering here is as follows:
+      // -NaN, -Infinity, -ve values, -0, 0, +0, +ve values, +Infinity, +NaN
+      val timerTimestamps: Seq[Double] = Seq(6894.32, 345.2795, -23.24, 24.466,
+        7860.0, 4535.55, 423.42, -5350.355, 0.0, 0.001, 0.233, -53.255, 
-66.356, -244.452,
+        96456466.3536677, 14421434453.43524562, Double.NaN, 
Double.PositiveInfinity,
+        Double.NegativeInfinity, -Double.NaN, +0.0, -0.0,
+        // A different representation of NaN than the Java constants
+        java.lang.Double.longBitsToDouble(0x7ff80abcdef54321L),
+        java.lang.Double.longBitsToDouble(0xfff80abcdef54321L))
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of variable size
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      // We expect to find NaNs at the beginning and end of the sorted list
+      var nanIndexSet = Set(0, 1, timerTimestamps.size - 2, 
timerTimestamps.size - 1)
+      val result = store.iterator(cfName).zipWithIndex.map { case (kv, idx) =>
+        val keyRow = kv.key
+        val key = (keyRow.getDouble(0), keyRow.getString(1))
+        if (key._1.isNaN) {
+          assert(nanIndexSet.contains(idx))
+          nanIndexSet -= idx
+        }
+        key._1
+      }.toSeq
+
+      assert(nanIndexSet.isEmpty)
+      assert(result.filter(!_.isNaN) === 
timerTimestamps.sorted.filter(!_.isNaN))
+      store.commit()
+    }
+  }
+
   testWithColumnFamilies("rocksdb range scan - variable size non-ordering 
columns",
     TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
 
@@ -308,7 +365,8 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
           RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
       }
 
-      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 
3L, 35L, 6L, 9L, 5L)
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 
3L, 35L, 6L, 9L, 5L,
+        -24L, -999L, -2L, -61L, -9808344L, -1020L)
       timerTimestamps.foreach { ts =>
         // non-timestamp col is of variable size
         val keyRow = dataToKeyRowWithRangeScan(ts,
@@ -327,7 +385,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
 
       // test with a different set of power of 2 timestamps
       val store1 = provider.getStore(1)
-      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L)
+      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L, -512L, -8192L, 
-16L)
       timerTimestamps1.foreach { ts =>
         // non-timestamp col is of fixed size
         val keyRow = dataToKeyRowWithRangeScan(ts,
@@ -368,7 +426,9 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
       }
 
       val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), (4200L, 
68), (90L, 2000),
-        (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), (6L, 90118), 
(9L, 95118), (6L, 87210))
+        (1L, 27), (1L, 394), (1L, 5), (3L, 980),
+        (-1L, 232), (-1L, 3455), (-6109L, 921455), (-9808344L, 1), (-1020L, 2),
+        (35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210), (-4344L, 2323), 
(-3122L, 323))
       timerTimestamps.foreach { ts =>
         // order by long col first and then by int col
         val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, 
ts._2,
@@ -411,7 +471,8 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
 
       val timerTimestamps = Seq((931L, 10), (null, 40), (452300L, 1),
         (4200L, 68), (90L, 2000), (1L, 27), (1L, 394), (1L, 5), (3L, 980), 
(35L, 2112),
-        (6L, 90118), (9L, 95118), (6L, 87210), (null, 113), (null, 28))
+        (6L, 90118), (9L, 95118), (6L, 87210), (null, 113), (null, 28), (null, 
-23), (null, -5534),
+        (-67450L, 2434), (-803L, 3422))
       timerTimestamps.foreach { ts =>
         // order by long col first and then by int col
         val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, 
ts._2,
@@ -426,7 +487,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
         val keyRow = kv.key
         keyRow.isNullAt(0)
       }
-      assert(nullRows.size === 3)
+      assert(nullRows.size === 5)
 
       // filter out the null rows and verify the rest
       val result: Seq[(Long, Int)] = store.iterator(cfName).filter { kv =>
@@ -440,7 +501,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
 
       val timerTimestampsWithoutNulls = Seq((931L, 10), (452300L, 1),
         (4200L, 68), (90L, 2000), (1L, 27), (1L, 394), (1L, 5), (3L, 980), 
(35L, 2112),
-        (6L, 90118), (9L, 95118), (6L, 87210))
+        (6L, 90118), (9L, 95118), (6L, 87210), (-67450L, 2434), (-803L, 3422))
 
       assert(result === timerTimestampsWithoutNulls.sorted)
 
@@ -453,7 +514,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
         keyRow.getInt(1)
       }.toSeq
 
-      assert(nullRowsWithOrder === Seq(28, 40, 113))
+      assert(nullRowsWithOrder === Seq(-5534, -23, 28, 40, 113))
 
       store.abort()
 
@@ -464,7 +525,8 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
           RangeKeyScanStateEncoderSpec(testSchema, 2))
       }
 
-      val timerTimestamps1 = Seq((null, 3), (null, 1), (null, 32), (null, 
113), (null, 40872),
+      val timerTimestamps1 = Seq((null, 3), (null, 1), (null, 32),
+        (null, 113), (null, 40872), (null, -675456), (null, -924), (null, 
-666),
         (null, 66))
       timerTimestamps1.foreach { ts =>
         // order by long col first and then by int col
@@ -508,6 +570,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
       }
 
       val timerTimestamps = Seq((931L, 10), (40L, null), (452300L, 1),
+        (-133L, null), (-344555L, 2424), (-4342499L, null),
         (4200L, 68), (90L, 2000), (1L, 27), (1L, 394), (1L, 5), (3L, 980), 
(35L, 2112),
         (6L, 90118), (9L, 95118), (6L, 87210), (113L, null), (100L, null))
       timerTimestamps.foreach { ts =>
@@ -524,7 +587,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
         val keyRow = kv.key
         keyRow.isNullAt(1)
       }
-      assert(nullRows.size === 3)
+      assert(nullRows.size === 5)
 
       // the ordering based on first col which has non-null values should be 
preserved
       val result: Seq[(Long, Int)] = store.iterator(cfName)
@@ -561,6 +624,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
 
       val timerTimestamps: Seq[(Byte, Int)] = Seq((0x33, 10), (0x1A, 40), 
(0x1F, 1), (0x01, 68),
         (0x7F, 2000), (0x01, 27), (0x01, 394), (0x01, 5), (0x03, 980), (0x35, 
2112),
+        (0x11, -190), (0x1A, -69), (0x01, -344245), (0x31, -901),
         (0x06, 90118), (0x09, 95118), (0x06, 87210))
       timerTimestamps.foreach { ts =>
         // order by byte col first and then by int col
@@ -594,7 +658,9 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
           RangeKeyScanStateEncoderSpec(valueSchema, 1))
       }
 
-      val timerTimestamps = Seq(931, 8000, 452300, 4200, 90, 1, 2, 8, 3, 35, 
6, 9, 5)
+      val timerTimestamps = Seq(931, 8000, 452300, 4200,
+        -3545, -343, 133, -90, -8014490, -79247,
+        90, 1, 2, 8, 3, 35, 6, 9, 5, -233)
       timerTimestamps.foreach { ts =>
         // non-timestamp col is of variable size
         val keyRow = dataToValueRow(ts)
@@ -634,7 +700,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
           RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
       }
 
-      val timerTimestamps = Seq(931L, 8000L, 1L)
+      val timerTimestamps = Seq(931L, -1331L, 8000L, 1L, -244L, -8350L, -55L)
       timerTimestamps.zipWithIndex.foreach { case (ts, idx) =>
         (1 to idx + 1).foreach { keyVal =>
           val keyRow = dataToKeyRowWithRangeScan(ts, keyVal.toString)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to