neilramaswamy commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1533103809


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -39,13 +41,21 @@ sealed trait RocksDBValueStateEncoder {
 }
 
 object RocksDBStateEncoder {
-  def getKeyEncoder(
-      keySchema: StructType,
-      numColsPrefixKey: Int): RocksDBKeyStateEncoder = {
-    if (numColsPrefixKey > 0) {
-      new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
-    } else {
-      new NoPrefixKeyStateEncoder(keySchema)
+  def getKeyEncoder(keyStateEncoderSpec: KeyStateEncoderSpec): 
RocksDBKeyStateEncoder = {
+    // Return the key state encoder based on the requested type
+    keyStateEncoderSpec match {
+      case NoPrefixKeyStateEncoderSpec(keySchema) =>
+        new NoPrefixKeyStateEncoder(keySchema)
+
+      case PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) =>
+        new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
+
+      case RangeKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) =>

Review Comment:
   `numOrderingCols` ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +199,233 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size 
fields

Review Comment:
   Nowhere in this do we mention why we cannot range scan non-fixed size fields 
(I understand, but maybe future readers won't). It's not mentioned in the error 
message either. Can we do that?
   
   I think also a quick comment explaining how this actually works would be 
helpful, i.e. "To encode a row `r` for a range scan, we first project the first 
`numOrderingCols` needed for the range scan into an `UnsafeRow`; we then 
rewrite that `UnsafeRow`'s fields in big endian. Then, for the rest of the 
fields, we project those into an `UnsafeRow`. We then effectively join these 
latter two `UnsafeRow`s together, and finally take those bytes to get the 
resulting row." For example, NoPrefixKeyStateEncoder has a comment like this.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +199,233 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size 
fields
+ * Note that for range scan, we have to encode the ordering columns using 
BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise 
comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key

Review Comment:
   nit typo



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -158,14 +161,360 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan validation - invalid num columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    // zero ordering cols
+    val ex1 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 0),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex1,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN",
+      parameters = Map(
+        "numOrderingCols" -> "0"
+      ),
+      matchPVals = true
+    )
+
+    // ordering cols greater than schema cols
+    val ex2 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 
keySchemaWithRangeScan.length + 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex2,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN",
+      parameters = Map(
+        "numOrderingCols" -> (keySchemaWithRangeScan.length + 1).toString
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - variable sized 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    val keySchemaWithVariableSizeCols: StructType = StructType(
+      Seq(StructField("key1", StringType, false), StructField("key2", 
StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithVariableSizeCols,
+        RangeKeyScanStateEncoderSpec(keySchemaWithVariableSizeCols, 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithVariableSizeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - null type columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    val keySchemaWithNullTypeCols: StructType = StructType(
+      Seq(StructField("key1", NullType, false), StructField("key2", 
StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithNullTypeCols,
+        RangeKeyScanStateEncoderSpec(keySchemaWithNullTypeCols, 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithNullTypeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan - fixed size non-ordering 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), 
colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      // use non-default col family if column families are enabled
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 
3L, 35L, 6L, 9L, 5L)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts, "a")
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+      store.commit()
+
+      // test with a different set of power of 2 timestamps
+      val store1 = provider.getStore(1)
+      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L)
+      timerTimestamps1.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts, "a")
+        val valueRow = dataToValueRow(1)
+        store1.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store1.get(keyRow, cfName)) === 1)
+      }
+
+      val result1 = store1.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering 
columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), 
colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 
3L, 35L, 6L, 9L, 5L)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of variable size
+        val keyRow = dataToKeyRowWithRangeScan(ts,
+          Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+      store.commit()
+
+      // test with a different set of power of 2 timestamps
+      val store1 = provider.getStore(1)
+      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L)
+      timerTimestamps1.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts,
+          Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)
+        val valueRow = dataToValueRow(1)
+        store1.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store1.get(keyRow, cfName)) === 1)
+      }
+
+      val result1 = store1.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan multiple ordering columns - 
variable size " +
+    s"non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", LongType, false),
+        StructField("key2", IntegerType, false),
+        StructField("key3", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](LongType, 
IntegerType, StringType))
+
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 2))
+      }
+
+      val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), (4200L, 
68), (90L, 2000),
+        (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), (6L, 90118), 
(9L, 95118), (6L, 87210))
+      timerTimestamps.foreach { ts =>
+        // order by long col first and then by int col
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, 
ts._2,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val keyRow = kv.key
+        val key = (keyRow.getLong(0), keyRow.getInt(1), keyRow.getString(2))
+        (key._1, key._2)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan byte ordering column - variable 
size " +
+    s"non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", ByteType, false),
+        StructField("key2", IntegerType, false),
+        StructField("key3", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](ByteType, 
IntegerType, StringType))
+
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 2))
+      }
+
+      val timerTimestamps: Seq[(Byte, Int)] = Seq((0x33, 10), (0x1A, 40), 
(0x1F, 1), (0x01, 68),
+        (0x7F, 2000), (0x01, 27), (0x01, 394), (0x01, 5), (0x03, 980), (0x35, 
2112),
+        (0x06, 90118), (0x09, 95118), (0x06, 87210))
+      timerTimestamps.foreach { ts =>
+        // order by byte col first and then by int col
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, 
ts._2,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result: Seq[(Byte, Int)] = store.iterator(cfName).map { kv =>
+        val keyRow = kv.key
+        val key = (keyRow.getByte(0), keyRow.getInt(1), keyRow.getString(2))
+        (key._1, key._2)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - ordering cols and key schema 
cols are same",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    // use the same schema as value schema for single col key schema
+    tryWithProviderResource(newStoreProvider(valueSchema,
+      RangeKeyScanStateEncoderSpec(valueSchema, 1), colFamiliesEnabled)) { 
provider =>
+      val store = provider.getStore(0)
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          valueSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(valueSchema, 1))
+      }
+
+      val timerTimestamps = Seq(931, 8000, 452300, 4200, 90, 1, 2, 8, 3, 35, 
6, 9, 5)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of variable size
+        val keyRow = dataToValueRow(ts)
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        valueRowToData(kv.key)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+
+      // also check for prefix scan
+      timerTimestamps.foreach { ts =>
+        val prefix = dataToValueRow(ts)
+        val result = store.prefixScan(prefix, cfName).map { kv =>
+          assert(valueRowToData(kv.value) === 1)
+          valueRowToData(kv.key)
+        }.toSeq
+        assert(result.size === 1)
+      }
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - with prefix scan",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), 
colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 1L)
+      timerTimestamps.foreach { ts =>
+        (1 to 5).foreach { keyVal =>
+          val keyRow = dataToKeyRowWithRangeScan(ts, keyVal.toString)
+          val valueRow = dataToValueRow(1)
+          store.put(keyRow, valueRow, cfName)
+          assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+        }
+      }
+
+      timerTimestamps.foreach { ts =>
+        val prefix = dataToPrefixKeyRowWithRangeScan(ts)
+        val result = store.prefixScan(prefix, cfName).map { kv =>
+          assert(valueRowToData(kv.value) === 1)
+          val key = keyRowWithRangeScanToData(kv.key)
+          key._2
+        }.toSeq
+        assert(result.size === 5)

Review Comment:
   Since we're only asserting about the size, I _think_ a buggy implementation 
of `prefixScan`, where it performs a prefix scan incorrectly (i.e. based on 
some key other than `prefix`) would still pass. For example, if you wanted to 
scan based on `8000L` and it instead did a scan using some other prefix (like 
`931L`), this test would still pass.
   
   Instead of asserting it's always equal to 5, what if, for timer at index `i` 
in `timerTimestamps`, we inserted `i+1` elements. Then, we would assert, after 
zipping the timestamps with index, that `results.size == i + 1`. With such a 
test, I don't think that a `prefixScan` implementation would be able to 
accidentally scan the wrong key and still pass this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to