micheal-o commented on code in PR #55226:
URL: https://github.com/apache/spark/pull/55226#discussion_r3049187197


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1662,6 +1662,93 @@ class RocksDB(
     }
   }
 
+  /**
+   * Scan key-value pairs in the range [startKey, endKey).
+   *
+   * @param startKey None to seek to the beginning of the column family,
+   *                 or Some(key) to seek to the given start position 
(inclusive).
+   * @param endKey   None to scan to the end of the column family,
+   *                 or Some(key) as the exclusive upper bound for the scan 
(encoded key bytes).
+   * @param cfName   The column family name.
+   * @return An iterator of ByteArrayPairs in the given range.
+   */
+  def scan(

Review Comment:
   nit: `rangeScan`



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -1629,6 +1629,217 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  private val diverseTimestamps = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 
1L, 2L, 8L,
+    -230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L,
+    -32L, -64L, -256L, 64L, 32L, 1024L, 4096L, 0L)
+
+  testWithColumnFamiliesAndEncodingTypes("rocksdb range scan - rangeScan",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>

Review Comment:
   why do you need to test with changelog on and off? same for the others below



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -1629,6 +1629,217 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  private val diverseTimestamps = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 
1L, 2L, 8L,
+    -230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L,
+    -32L, -64L, -256L, 64L, 32L, 1024L, 4096L, 0L)
+
+  testWithColumnFamiliesAndEncodingTypes("rocksdb range scan - rangeScan",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
+      colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+      try {
+        val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+        if (colFamiliesEnabled) {
+          store.createColFamilyIfAbsent(cfName,
+            keySchemaWithRangeScan, valueSchema,
+            RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
+        }
+
+        diverseTimestamps.foreach { ts =>
+          store.put(dataToKeyRowWithRangeScan(ts, "a"), 
dataToValueRow(ts.toInt), cfName)
+        }
+
+        // Bounded positive range [0, 100)
+        val boundedIter = store.rangeScan(
+          Some(dataToKeyRowWithRangeScan(0L, "a")),
+          Some(dataToKeyRowWithRangeScan(100L, "a")), cfName)
+        val boundedResults = boundedIter.map { pair =>
+          (pair.key.getLong(0), pair.value.getInt(0))
+        }.toList
+        boundedIter.close()
+        val expectedBoundedTs = diverseTimestamps.filter(ts => ts >= 0 && ts < 
100).sorted
+        assert(boundedResults.map(_._1) === expectedBoundedTs)
+        assert(boundedResults.map(_._2) === expectedBoundedTs.map(_.toInt))
+
+        // Exact bound: startKey is inclusive, endKey is exclusive.
+        // 9 exists in diverseTimestamps, 90 exists in diverseTimestamps.
+        // Scan [9, 90) should include 9 but exclude 90.
+        val exactIter = store.rangeScan(
+          Some(dataToKeyRowWithRangeScan(9L, "a")),
+          Some(dataToKeyRowWithRangeScan(90L, "a")), cfName)
+        val exactResults = exactIter.map(_.key.getLong(0)).toList
+        exactIter.close()
+        assert(exactResults === diverseTimestamps.filter(ts => ts >= 9 && ts < 
90).sorted)
+        assert(exactResults.contains(9L))
+        assert(!exactResults.contains(90L))
+
+        // None startKey scans from beginning to 0
+        val noneStartIter = store.rangeScan(
+          None, Some(dataToKeyRowWithRangeScan(0L, "a")), cfName)
+        val noneStartResults = noneStartIter.map(_.key.getLong(0)).toList
+        noneStartIter.close()
+        assert(noneStartResults === diverseTimestamps.filter(_ < 0).sorted)
+
+        // None endKey scans from 1000 to end
+        val noneEndIter = store.rangeScan(
+          Some(dataToKeyRowWithRangeScan(1000L, "a")), None, cfName)
+        val noneEndResults = noneEndIter.map(_.key.getLong(0)).toList
+        noneEndIter.close()
+        assert(noneEndResults === diverseTimestamps.filter(_ >= 1000).sorted)
+
+        // Empty range [10, 31) - no entries between 9 and 32
+        val emptyIter = store.rangeScan(
+          Some(dataToKeyRowWithRangeScan(10L, "a")),
+          Some(dataToKeyRowWithRangeScan(31L, "a")), cfName)
+        assert(!emptyIter.hasNext)
+        emptyIter.close()
+
+        // Bounded negative range [-300, 0)
+        val negIter = store.rangeScan(
+          Some(dataToKeyRowWithRangeScan(-300L, "a")),
+          Some(dataToKeyRowWithRangeScan(0L, "a")), cfName)
+        val negResults = negIter.map(_.key.getLong(0)).toList
+        negIter.close()
+        assert(negResults === diverseTimestamps.filter(ts => ts >= -300 && ts 
< 0).sorted)

Review Comment:
   should we also test start=None, end=None?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to