This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 42580dc59 [FLINK-37676][common] Add caching mechanism to Table ID
Selectors for better performance (#3994)
42580dc59 is described below
commit 42580dc59fa066a935ca29abdafed148f3d86467
Author: Tianzhu Wen <[email protected]>
AuthorDate: Mon Jan 5 10:26:55 2026 +0800
[FLINK-37676][common] Add caching mechanism to Table ID Selectors for
better performance (#3994)
---
.../apache/flink/cdc/common/schema/Selectors.java | 26 +++-
.../flink-cdc-pipeline-e2e-tests/pom.xml | 16 +++
.../tests/benchmark/SelectorsBenchmark.java | 145 +++++++++++++++++++++
3 files changed, 186 insertions(+), 1 deletion(-)
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java
index e23d43db0..af686753d 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java
@@ -20,6 +20,10 @@ package org.apache.flink.cdc.common.schema;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.utils.Predicates;
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -29,8 +33,16 @@ import java.util.function.Predicate;
/** Selectors for filtering tables. */
public class Selectors {
+ private static final Duration CACHE_EXPIRE_DURATION = Duration.ofHours(1);
+
private List<Selector> selectors;
+ private final Cache<TableId, Boolean> cache =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(CACHE_EXPIRE_DURATION)
+ .maximumSize(1024)
+ .build();
+
private Selectors() {}
/**
@@ -71,8 +83,20 @@ public class Selectors {
}
}
- /** Match the {@link TableId} against the {@link Selector}s. * */
+ /** Match the {@link TableId} against the {@link Selector}s. */
public boolean isMatch(TableId tableId) {
+ Boolean cachedResult = cache.getIfPresent(tableId);
+ if (cachedResult != null) {
+ return cachedResult;
+ }
+
+ boolean match = computeIsMatch(tableId);
+ cache.put(tableId, match);
+ return match;
+ }
+
+ /** Computes the match result if not present in the cache. */
+ private boolean computeIsMatch(TableId tableId) {
for (Selector selector : selectors) {
if (selector.isMatch(tableId)) {
return true;
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index ff3478cb4..6e12e79d5 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -43,6 +43,7 @@ limitations under the License.
<iceberg.version>1.6.1</iceberg.version>
<hive.version>2.3.9</hive.version>
<fluss.version>0.7.0</fluss.version>
+ <jmh.version>1.37</jmh.version>
<hudi.version>1.1.0</hudi.version>
</properties>
@@ -236,6 +237,21 @@ limitations under the License.
<scope>test</scope>
</dependency>
+ <!-- benchmark -->
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-core</artifactId>
+ <version>${jmh.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <version>${jmh.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- test dependencies on TestContainers -->
<dependency>
<groupId>org.testcontainers</groupId>
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/benchmark/SelectorsBenchmark.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/benchmark/SelectorsBenchmark.java
new file mode 100644
index 000000000..4dfa2790b
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/benchmark/SelectorsBenchmark.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.benchmark;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Selectors;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Benchmark for table selector performance with and without cache.
+ *
+ * <pre>
+ * Benchmark Mode Cnt
Score Error Units
+ * SelectorsBenchmark.testSelectorWithCache thrpt 20
1028.979 ± 218.663 ops/ms
+ * SelectorsBenchmark.testSelectorWithoutCache thrpt 20
136.747 ± 11.872 ops/ms
+ * </pre>
+ */
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 10, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(2)
+@Threads(2)
+public class SelectorsBenchmark {
+
+ private Selectors selectors;
+ private List<TableId> queryTableIds;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ selectors =
+ new Selectors.SelectorsBuilder()
+ .includeTables(
+
"test_wms_inventory_[a-z]+.inventory_batch_detail,"
+ +
"test_wms_inventory_[a-z]+.inventory_batch_detail_record,"
+ +
"test_wms_inventory_[a-z]+.inventory_batch_input,"
+ +
"test_wms_inventory_[a-z]+.inventory_flow_volume_level,"
+ +
"test_wms_inventory_[a-z]+.inventory_snapshot,"
+ +
"test_wms_log_[a-z]+.log_common_log_[a-z]+")
+ .build();
+
+ queryTableIds =
+ ImmutableList.of(
+ TableId.tableId(
+
"test_wms_common_europe.occupy_strategy_exe_progress_order"),
+
TableId.tableId("test_wms_common_europe.wave_strategy_rule_relation"),
+ TableId.tableId("db.sc2.A1"),
+ TableId.tableId("db2.sc2.A1"),
+
TableId.tableId("test_wms_output_s.out_moment_storage_location_relation"),
+
TableId.tableId("test_wms_output_a.out_moment_storage_location_relation"));
+
+ // warm up cache
+ for (TableId id : queryTableIds) {
+ selectors.isMatch(id);
+ }
+ }
+
+ /**
+ * Benchmark to evaluate the performance of table selector with caching
enabled.
+ *
+ * <p>This benchmark measures throughput when using a pre-built {@link
Selectors} instance that
+ * leverages internal caching mechanisms. This simulates a typical usage
scenario where selector
+ * rules are initialized once and reused across multiple queries.
+ *
+ * <p>Expected to perform significantly better than non-cached version due
to avoidance of
+ * repeated regex parsing and compilation.
+ */
+ @Benchmark
+ public void testSelectorWithCache() {
+ for (TableId id : queryTableIds) {
+ selectors.isMatch(id);
+ }
+ }
+
+ /**
+ * Benchmark to evaluate the performance of table selector without using
cache.
+ *
+ * <p>This benchmark constructs a new {@link Selectors} instance for each
invocation, simulating
+ * a cold-start or ad-hoc usage scenario. The overhead includes pattern
parsing and matcher
+ * construction, which significantly impacts throughput.
+ *
+ * <p>Useful for understanding worst-case performance and comparing
against the cached version.
+ */
+ @Benchmark
+ public void testSelectorWithoutCache() {
+ Selectors freshSelectors =
+ new Selectors.SelectorsBuilder()
+ .includeTables(
+
"test_wms_inventory_[a-z]+.inventory_batch_detail,"
+ +
"test_wms_inventory_[a-z]+.inventory_batch_detail_record,"
+ +
"test_wms_inventory_[a-z]+.inventory_batch_input,"
+ +
"test_wms_inventory_[a-z]+.inventory_flow_volume_level,"
+ +
"test_wms_inventory_[a-z]+.inventory_snapshot,"
+ +
"test_wms_log_[a-z]+.log_common_log_[a-z]+")
+ .build();
+ for (TableId id : queryTableIds) {
+ freshSelectors.isMatch(id);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options =
+ new OptionsBuilder()
+ .include(SelectorsBenchmark.class.getSimpleName())
+ .detectJvmArgs()
+ .build();
+ new Runner(options).run();
+ }
+}