This is an automated email from the ASF dual-hosted git repository.
zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
The following commit(s) were added to refs/heads/master by this push:
new fde0c29 [FLINK-36622] Remove the dependency on
RocksDBKeyedStateBackend APIs. (#98)
fde0c29 is described below
commit fde0c298319f6d00a8fc411c2c770d12c853fde6
Author: AlexYinHan <[email protected]>
AuthorDate: Mon Nov 4 10:25:57 2024 +0800
[FLINK-36622] Remove the dependency on RocksDBKeyedStateBackend APIs. (#98)
---
.../apache/flink/state/benchmark/ListStateBenchmark.java | 13 ++-----------
.../flink/state/benchmark/ttl/TtlListStateBenchmark.java | 13 ++-----------
2 files changed, 4 insertions(+), 22 deletions(-)
diff --git
a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
index 2b310c2..c8c6f2d 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
@@ -20,7 +20,6 @@ package org.apache.flink.state.benchmark;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
@@ -81,11 +80,7 @@ public class ListStateBenchmark extends StateBenchmarkBase {
// make sure only one sst file left, so all get invocation will access
this single file,
// to prevent the spike caused by different key distribution in
multiple sst files,
// the more access to the older sst file, the lower throughput will be.
- if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
- RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
- (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
- compactState(rocksDBKeyedStateBackend, STATE_DESC);
- }
+ compactState(keyedStateBackend, STATE_DESC);
}
@TearDown(Level.Iteration)
@@ -98,11 +93,7 @@ public class ListStateBenchmark extends StateBenchmarkBase {
state.clear();
});
// make the clearance effective, trigger compaction for RocksDB, and
GC for heap.
- if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
- RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
- (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
- compactState(rocksDBKeyedStateBackend, STATE_DESC);
- } else {
+ if (!compactState(keyedStateBackend, STATE_DESC)) {
System.gc();
}
// wait a while for the clearance to take effect.
diff --git
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
index 1977cf6..829b440 100644
---
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
+++
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
@@ -20,7 +20,6 @@ package org.apache.flink.state.benchmark.ttl;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.state.benchmark.StateBenchmarkBase;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
@@ -82,11 +81,7 @@ public class TtlListStateBenchmark extends
TtlStateBenchmarkBase {
// make sure only one sst file left, so all get invocation will access
this single file,
// to prevent the spike caused by different key distribution in
multiple sst files,
// the more access to the older sst file, the lower throughput will be.
- if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
- RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
- (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
- compactState(rocksDBKeyedStateBackend, stateDesc);
- }
+ compactState(keyedStateBackend, stateDesc);
advanceTimePerIteration();
}
@@ -100,11 +95,7 @@ public class TtlListStateBenchmark extends
TtlStateBenchmarkBase {
state.clear();
});
// make the clearance effective, trigger compaction for RocksDB, and
GC for heap.
- if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
- RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
- (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
- compactState(rocksDBKeyedStateBackend, stateDesc);
- } else {
+ if (!compactState(keyedStateBackend, stateDesc)) {
System.gc();
}
// wait a while for the clearance to take effect.