Myasuka commented on code in PR #20405:
URL: https://github.com/apache/flink/pull/20405#discussion_r944230268
##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java:
##########
@@ -637,11 +642,75 @@ public void testMapStateClear() throws Exception {
throw new RocksDBException("Artificial failure");
})
.when(keyedStateBackend.db)
- .newIterator(any(ColumnFamilyHandle.class),
any(ReadOptions.class));
+ .deleteRange(any(ColumnFamilyHandle.class), any(byte[].class),
any(byte[].class));
state.clear();
}
+ @Test
+ public void testMapStateClearWithOneBytePrefix() throws Exception {
+ verifyMapStateClear(Byte.MAX_VALUE + 1);
+ }
+
+ @Test
+ public void testMapStateClearWithTwoBytesPrefix() throws Exception {
+
verifyMapStateClear(KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM);
+ }
+
+ public void verifyMapStateClear(int maxParallelism) throws Exception {
+ try {
+ prepareRocksDB();
+
+ RocksDBKeyedStateBackendBuilder keyedStateBackendBuilder =
+ RocksDBTestUtils.builderForTestDB(
+ TEMP_FOLDER.newFolder(),
+ IntSerializer.INSTANCE,
+ db,
+ defaultCFHandle,
+ optionsContainer.getColumnOptions(),
+ maxParallelism);
+ keyedStateBackend = keyedStateBackendBuilder.build();
+
+ MapStateDescriptor<Integer, String> kvId =
+ new MapStateDescriptor<>("id", Integer.class,
String.class);
+ MapState<Integer, String> state =
+ keyedStateBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+
+ int[] keys = new int[maxParallelism];
+ BitSet bitSet = new BitSet(maxParallelism);
+ // Make sure each keyGroup has a key
+ for (int i = 0; bitSet.cardinality() != maxParallelism; i++) {
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i,
maxParallelism);
+ if (!bitSet.get(keyGroup)) {
+ bitSet.set(keyGroup);
+ keys[keyGroup] = i;
+ }
+ }
+
+ for (int key : keys) {
+ keyedStateBackend.setCurrentKey(key);
+ state.put(key, "retain " + key);
+ }
+
+ int clearKeyGroup = RandomUtils.nextInt(0, maxParallelism);
+ keyedStateBackend.setCurrentKey(keys[clearKeyGroup]);
+ state.clear();
Review Comment:
Previously, I think we can clear the map state per key group one by one and
verify the results then. That's why I was wondering about the performance of
this unit test.
Could you try this idea and see how long will the unit test spend?
##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java:
##########
@@ -637,11 +642,75 @@ public void testMapStateClear() throws Exception {
throw new RocksDBException("Artificial failure");
})
.when(keyedStateBackend.db)
- .newIterator(any(ColumnFamilyHandle.class),
any(ReadOptions.class));
+ .deleteRange(any(ColumnFamilyHandle.class), any(byte[].class),
any(byte[].class));
state.clear();
}
+ @Test
+ public void testMapStateClearWithOneBytePrefix() throws Exception {
+ verifyMapStateClear(Byte.MAX_VALUE + 1);
+ }
+
+ @Test
+ public void testMapStateClearWithTwoBytesPrefix() throws Exception {
+
verifyMapStateClear(KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM);
+ }
+
+ public void verifyMapStateClear(int maxParallelism) throws Exception {
+ try {
+ prepareRocksDB();
+
+ RocksDBKeyedStateBackendBuilder keyedStateBackendBuilder =
+ RocksDBTestUtils.builderForTestDB(
+ TEMP_FOLDER.newFolder(),
+ IntSerializer.INSTANCE,
+ db,
+ defaultCFHandle,
+ optionsContainer.getColumnOptions(),
+ maxParallelism);
+ keyedStateBackend = keyedStateBackendBuilder.build();
+
+ MapStateDescriptor<Integer, String> kvId =
+ new MapStateDescriptor<>("id", Integer.class,
String.class);
+ MapState<Integer, String> state =
+ keyedStateBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+
+ int[] keys = new int[maxParallelism];
+ BitSet bitSet = new BitSet(maxParallelism);
+ // Make sure each keyGroup has a key
+ for (int i = 0; bitSet.cardinality() != maxParallelism; i++) {
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i,
maxParallelism);
+ if (!bitSet.get(keyGroup)) {
+ bitSet.set(keyGroup);
+ keys[keyGroup] = i;
+ }
+ }
+
+ for (int key : keys) {
+ keyedStateBackend.setCurrentKey(key);
+ state.put(key, "retain " + key);
+ }
+
+ int clearKeyGroup = RandomUtils.nextInt(0, maxParallelism);
Review Comment:
We can use `ThreadLocalRandom.current().nextInt(0, maxParallelism);` to
avoid using 3rd tools.
##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java:
##########
@@ -637,11 +642,75 @@ public void testMapStateClear() throws Exception {
throw new RocksDBException("Artificial failure");
})
.when(keyedStateBackend.db)
- .newIterator(any(ColumnFamilyHandle.class),
any(ReadOptions.class));
+ .deleteRange(any(ColumnFamilyHandle.class), any(byte[].class),
any(byte[].class));
state.clear();
}
+ @Test
+ public void testMapStateClearWithOneBytePrefix() throws Exception {
+ verifyMapStateClear(Byte.MAX_VALUE + 1);
+ }
+
+ @Test
+ public void testMapStateClearWithTwoBytesPrefix() throws Exception {
+
verifyMapStateClear(KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM);
+ }
+
+ public void verifyMapStateClear(int maxParallelism) throws Exception {
+ try {
+ prepareRocksDB();
+
+ RocksDBKeyedStateBackendBuilder keyedStateBackendBuilder =
+ RocksDBTestUtils.builderForTestDB(
+ TEMP_FOLDER.newFolder(),
+ IntSerializer.INSTANCE,
+ db,
+ defaultCFHandle,
+ optionsContainer.getColumnOptions(),
+ maxParallelism);
+ keyedStateBackend = keyedStateBackendBuilder.build();
+
+ MapStateDescriptor<Integer, String> kvId =
+ new MapStateDescriptor<>("id", Integer.class,
String.class);
+ MapState<Integer, String> state =
+ keyedStateBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+
+ int[] keys = new int[maxParallelism];
+ BitSet bitSet = new BitSet(maxParallelism);
+ // Make sure each keyGroup has a key
+ for (int i = 0; bitSet.cardinality() != maxParallelism; i++) {
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i,
maxParallelism);
+ if (!bitSet.get(keyGroup)) {
+ bitSet.set(keyGroup);
+ keys[keyGroup] = i;
+ }
+ }
+
+ for (int key : keys) {
+ keyedStateBackend.setCurrentKey(key);
+ state.put(key, "retain " + key);
Review Comment:
I wonder why we need to use string to serve as the map-state value. In the
test with max-parallelism of 32768, we need to append to generate a new string
with about 65k times. This is actually unnecessary, as we can just put an
integer value and verify it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]