neilramaswamy commented on code in PR #45503: URL: https://github.com/apache/spark/pull/45503#discussion_r1533103809
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -39,13 +41,21 @@ sealed trait RocksDBValueStateEncoder { } object RocksDBStateEncoder { - def getKeyEncoder( - keySchema: StructType, - numColsPrefixKey: Int): RocksDBKeyStateEncoder = { - if (numColsPrefixKey > 0) { - new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey) - } else { - new NoPrefixKeyStateEncoder(keySchema) + def getKeyEncoder(keyStateEncoderSpec: KeyStateEncoderSpec): RocksDBKeyStateEncoder = { + // Return the key state encoder based on the requested type + keyStateEncoderSpec match { + case NoPrefixKeyStateEncoderSpec(keySchema) => + new NoPrefixKeyStateEncoder(keySchema) + + case PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) => + new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey) + + case RangeKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) => Review Comment: `numOrderingCols` ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -192,6 +199,233 @@ class PrefixKeyScanStateEncoder( override def supportPrefixKeyScan: Boolean = true } +/** + * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields Review Comment: Nowhere in this do we mention why we cannot range scan non-fixed size fields (I understand, but maybe future readers won't). It's not mentioned in the error message either. Can we do that? I think also a quick comment explaining how this actually works would be helpful, i.e. "To encode a row `r` for a range scan, we first project the first `numOrderingCols` needed for the range scan into an `UnsafeRow`; we then rewrite that `UnsafeRow`'s fields in big endian. Then, for the rest of the fields, we project those into an `UnsafeRow`. We then effectively join these latter two `UnsafeRow`s together, and finally take those bytes to get the resulting row." For example, NoPrefixKeyStateEncoder has a comment like this. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -192,6 +199,233 @@ class PrefixKeyScanStateEncoder( override def supportPrefixKeyScan: Boolean = true } +/** + * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields + * Note that for range scan, we have to encode the ordering columns using BIG_ENDIAN + * encoding to allow for scanning keys in sorted order using the byte-wise comparison + * method that RocksDB uses. + * + * @param keySchema - schema of the key to be encoded + * @param numColsPrefixKey - number of columns to be used for prefix key Review Comment: nit typo ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala: ########## @@ -158,14 +161,360 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } } + testWithColumnFamilies("rocksdb range scan validation - invalid num columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + // zero ordering cols + val ex1 = intercept[SparkUnsupportedOperationException] { + tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 0), + colFamiliesEnabled)) { provider => + provider.getStore(0) + } + } + checkError( + ex1, + errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN", + parameters = Map( + "numOrderingCols" -> "0" + ), + matchPVals = true + ) + + // ordering cols greater than schema cols + val ex2 = intercept[SparkUnsupportedOperationException] { + tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, keySchemaWithRangeScan.length + 1), + colFamiliesEnabled)) { provider => + provider.getStore(0) + } + } + checkError( + ex2, + errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN", + parameters = Map( + "numOrderingCols" -> (keySchemaWithRangeScan.length + 1).toString + ), + matchPVals = true + ) + } + + testWithColumnFamilies("rocksdb range scan validation - variable sized columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + val keySchemaWithVariableSizeCols: StructType = StructType( + Seq(StructField("key1", StringType, false), StructField("key2", StringType, false))) + + val ex = intercept[SparkUnsupportedOperationException] { + tryWithProviderResource(newStoreProvider(keySchemaWithVariableSizeCols, + RangeKeyScanStateEncoderSpec(keySchemaWithVariableSizeCols, 1), + colFamiliesEnabled)) { provider => + provider.getStore(0) + } + } + checkError( + ex, + errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED", + parameters = Map( + "fieldName" -> keySchemaWithVariableSizeCols.fields(0).name, + "index" -> "0" + ), + matchPVals = true + ) + } + + testWithColumnFamilies("rocksdb range scan validation - null type columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + val keySchemaWithNullTypeCols: StructType = StructType( + Seq(StructField("key1", NullType, false), StructField("key2", StringType, false))) + + val ex = intercept[SparkUnsupportedOperationException] { + tryWithProviderResource(newStoreProvider(keySchemaWithNullTypeCols, + RangeKeyScanStateEncoderSpec(keySchemaWithNullTypeCols, 1), + colFamiliesEnabled)) { provider => + provider.getStore(0) + } + } + checkError( + ex, + errorClass = "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED", + parameters = Map( + "fieldName" -> keySchemaWithNullTypeCols.fields(0).name, + "index" -> "0" + ), + matchPVals = true + ) + } + + testWithColumnFamilies("rocksdb range scan - fixed size non-ordering columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + // use non-default col family if column families are enabled + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + keySchemaWithRangeScan, valueSchema, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1)) + } + + val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L) + timerTimestamps.foreach { ts => + // non-timestamp col is of fixed size + val keyRow = dataToKeyRowWithRangeScan(ts, "a") + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + val result = store.iterator(cfName).map { kv => + val key = keyRowWithRangeScanToData(kv.key) + key._1 + }.toSeq + assert(result === timerTimestamps.sorted) + store.commit() + + // test with a different set of power of 2 timestamps + val store1 = provider.getStore(1) + val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L) + timerTimestamps1.foreach { ts => + // non-timestamp col is of fixed size + val keyRow = dataToKeyRowWithRangeScan(ts, "a") + val valueRow = dataToValueRow(1) + store1.put(keyRow, valueRow, cfName) + assert(valueRowToData(store1.get(keyRow, cfName)) === 1) + } + + val result1 = store1.iterator(cfName).map { kv => + val key = keyRowWithRangeScanToData(kv.key) + key._1 + }.toSeq + assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted) + } + } + + testWithColumnFamilies("rocksdb range scan - variable size non-ordering columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + keySchemaWithRangeScan, valueSchema, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1)) + } + + val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L) + timerTimestamps.foreach { ts => + // non-timestamp col is of variable size + val keyRow = dataToKeyRowWithRangeScan(ts, + Random.alphanumeric.take(Random.nextInt(20) + 1).mkString) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + val result = store.iterator(cfName).map { kv => + val key = keyRowWithRangeScanToData(kv.key) + key._1 + }.toSeq + assert(result === timerTimestamps.sorted) + store.commit() + + // test with a different set of power of 2 timestamps + val store1 = provider.getStore(1) + val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L) + timerTimestamps1.foreach { ts => + // non-timestamp col is of fixed size + val keyRow = dataToKeyRowWithRangeScan(ts, + Random.alphanumeric.take(Random.nextInt(20) + 1).mkString) + val valueRow = dataToValueRow(1) + store1.put(keyRow, valueRow, cfName) + assert(valueRowToData(store1.get(keyRow, cfName)) === 1) + } + + val result1 = store1.iterator(cfName).map { kv => + val key = keyRowWithRangeScanToData(kv.key) + key._1 + }.toSeq + assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted) + } + } + + testWithColumnFamilies("rocksdb range scan multiple ordering columns - variable size " + + s"non-ordering columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + val testSchema: StructType = StructType( + Seq(StructField("key1", LongType, false), + StructField("key2", IntegerType, false), + StructField("key3", StringType, false))) + + val schemaProj = UnsafeProjection.create(Array[DataType](LongType, IntegerType, StringType)) + + tryWithProviderResource(newStoreProvider(testSchema, + RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + testSchema, valueSchema, + RangeKeyScanStateEncoderSpec(testSchema, 2)) + } + + val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), (4200L, 68), (90L, 2000), + (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210)) + timerTimestamps.foreach { ts => + // order by long col first and then by int col + val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2, + UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)))) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + val result = store.iterator(cfName).map { kv => + val keyRow = kv.key + val key = (keyRow.getLong(0), keyRow.getInt(1), keyRow.getString(2)) + (key._1, key._2) + }.toSeq + assert(result === timerTimestamps.sorted) + } + } + + testWithColumnFamilies("rocksdb range scan byte ordering column - variable size " + + s"non-ordering columns", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + val testSchema: StructType = StructType( + Seq(StructField("key1", ByteType, false), + StructField("key2", IntegerType, false), + StructField("key3", StringType, false))) + + val schemaProj = UnsafeProjection.create(Array[DataType](ByteType, IntegerType, StringType)) + + tryWithProviderResource(newStoreProvider(testSchema, + RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + testSchema, valueSchema, + RangeKeyScanStateEncoderSpec(testSchema, 2)) + } + + val timerTimestamps: Seq[(Byte, Int)] = Seq((0x33, 10), (0x1A, 40), (0x1F, 1), (0x01, 68), + (0x7F, 2000), (0x01, 27), (0x01, 394), (0x01, 5), (0x03, 980), (0x35, 2112), + (0x06, 90118), (0x09, 95118), (0x06, 87210)) + timerTimestamps.foreach { ts => + // order by byte col first and then by int col + val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2, + UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)))) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + val result: Seq[(Byte, Int)] = store.iterator(cfName).map { kv => + val keyRow = kv.key + val key = (keyRow.getByte(0), keyRow.getInt(1), keyRow.getString(2)) + (key._1, key._2) + }.toSeq + assert(result === timerTimestamps.sorted) + } + } + + testWithColumnFamilies("rocksdb range scan - ordering cols and key schema cols are same", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + // use the same schema as value schema for single col key schema + tryWithProviderResource(newStoreProvider(valueSchema, + RangeKeyScanStateEncoderSpec(valueSchema, 1), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + valueSchema, valueSchema, + RangeKeyScanStateEncoderSpec(valueSchema, 1)) + } + + val timerTimestamps = Seq(931, 8000, 452300, 4200, 90, 1, 2, 8, 3, 35, 6, 9, 5) + timerTimestamps.foreach { ts => + // non-timestamp col is of variable size + val keyRow = dataToValueRow(ts) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + val result = store.iterator(cfName).map { kv => + valueRowToData(kv.key) + }.toSeq + assert(result === timerTimestamps.sorted) + + // also check for prefix scan + timerTimestamps.foreach { ts => + val prefix = dataToValueRow(ts) + val result = store.prefixScan(prefix, cfName).map { kv => + assert(valueRowToData(kv.value) === 1) + valueRowToData(kv.key) + }.toSeq + assert(result.size === 1) + } + } + } + + testWithColumnFamilies("rocksdb range scan - with prefix scan", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + + tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + keySchemaWithRangeScan, valueSchema, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1)) + } + + val timerTimestamps = Seq(931L, 8000L, 1L) + timerTimestamps.foreach { ts => + (1 to 5).foreach { keyVal => + val keyRow = dataToKeyRowWithRangeScan(ts, keyVal.toString) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + } + + timerTimestamps.foreach { ts => + val prefix = dataToPrefixKeyRowWithRangeScan(ts) + val result = store.prefixScan(prefix, cfName).map { kv => + assert(valueRowToData(kv.value) === 1) + val key = keyRowWithRangeScanToData(kv.key) + key._2 + }.toSeq + assert(result.size === 5) Review Comment: Since we're only asserting about the size, I _think_ a buggy implementation of `prefixScan`, where it performs a prefix scan incorrectly (i.e. based on some key other than `prefix`) would still pass. For example, if you wanted to scan based on `8000L` and it instead did a scan using some other prefix (like `931L`), this test would still pass. Instead of asserting it's always equal to 5, what if, for timer at index `i` in `timerTimestamps`, we inserted `i+1` elements. Then, we would assert, after zipping the timestamps with index, that `results.size == i + 1`. With such a test, I don't think that a `prefixScan` implementation would be able to accidentally scan the wrong key and still pass this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org