This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
     new fde0c29  [FLINK-36622] Remove the dependency on 
RocksDBKeyedStateBackend APIs. (#98)
fde0c29 is described below

commit fde0c298319f6d00a8fc411c2c770d12c853fde6
Author: AlexYinHan <[email protected]>
AuthorDate: Mon Nov 4 10:25:57 2024 +0800

    [FLINK-36622] Remove the dependency on RocksDBKeyedStateBackend APIs. (#98)
---
 .../apache/flink/state/benchmark/ListStateBenchmark.java    | 13 ++-----------
 .../flink/state/benchmark/ttl/TtlListStateBenchmark.java    | 13 ++-----------
 2 files changed, 4 insertions(+), 22 deletions(-)

diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java 
b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
index 2b310c2..c8c6f2d 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
@@ -20,7 +20,6 @@ package org.apache.flink.state.benchmark;
 
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
 
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Level;
@@ -81,11 +80,7 @@ public class ListStateBenchmark extends StateBenchmarkBase {
         // make sure only one sst file left, so all get invocation will access 
this single file,
         // to prevent the spike caused by different key distribution in 
multiple sst files,
         // the more access to the older sst file, the lower throughput will be.
-        if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
-            RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
-                    (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
-            compactState(rocksDBKeyedStateBackend, STATE_DESC);
-        }
+        compactState(keyedStateBackend, STATE_DESC);
     }
 
     @TearDown(Level.Iteration)
@@ -98,11 +93,7 @@ public class ListStateBenchmark extends StateBenchmarkBase {
                     state.clear();
                 });
         // make the clearance effective, trigger compaction for RocksDB, and 
GC for heap.
-        if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
-            RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
-                    (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
-            compactState(rocksDBKeyedStateBackend, STATE_DESC);
-        } else {
+        if (!compactState(keyedStateBackend, STATE_DESC)) {
             System.gc();
         }
         // wait a while for the clearance to take effect.
diff --git 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
index 1977cf6..829b440 100644
--- 
a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
+++ 
b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
@@ -20,7 +20,6 @@ package org.apache.flink.state.benchmark.ttl;
 
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
 import org.apache.flink.state.benchmark.StateBenchmarkBase;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Level;
@@ -82,11 +81,7 @@ public class TtlListStateBenchmark extends 
TtlStateBenchmarkBase {
         // make sure only one sst file left, so all get invocation will access 
this single file,
         // to prevent the spike caused by different key distribution in 
multiple sst files,
         // the more access to the older sst file, the lower throughput will be.
-        if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
-            RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
-                    (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
-            compactState(rocksDBKeyedStateBackend, stateDesc);
-        }
+        compactState(keyedStateBackend, stateDesc);
         advanceTimePerIteration();
     }
 
@@ -100,11 +95,7 @@ public class TtlListStateBenchmark extends 
TtlStateBenchmarkBase {
                     state.clear();
                 });
         // make the clearance effective, trigger compaction for RocksDB, and 
GC for heap.
-        if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {
-            RocksDBKeyedStateBackend<Long> rocksDBKeyedStateBackend =
-                    (RocksDBKeyedStateBackend<Long>) keyedStateBackend;
-            compactState(rocksDBKeyedStateBackend, stateDesc);
-        } else {
+        if (!compactState(keyedStateBackend, stateDesc)) {
             System.gc();
         }
         // wait a while for the clearance to take effect.

Reply via email to