This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 42580dc59 [FLINK-37676][common] Add caching mechanism to Table ID 
Selectors for better performance (#3994)
42580dc59 is described below

commit 42580dc59fa066a935ca29abdafed148f3d86467
Author: Tianzhu Wen <[email protected]>
AuthorDate: Mon Jan 5 10:26:55 2026 +0800

    [FLINK-37676][common] Add caching mechanism to Table ID Selectors for 
better performance (#3994)
---
 .../apache/flink/cdc/common/schema/Selectors.java  |  26 +++-
 .../flink-cdc-pipeline-e2e-tests/pom.xml           |  16 +++
 .../tests/benchmark/SelectorsBenchmark.java        | 145 +++++++++++++++++++++
 3 files changed, 186 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java
index e23d43db0..af686753d 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Selectors.java
@@ -20,6 +20,10 @@ package org.apache.flink.cdc.common.schema;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.utils.Predicates;
 
+import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -29,8 +33,16 @@ import java.util.function.Predicate;
 /** Selectors for filtering tables. */
 public class Selectors {
 
+    private static final Duration CACHE_EXPIRE_DURATION = Duration.ofHours(1);
+
     private List<Selector> selectors;
 
+    private final Cache<TableId, Boolean> cache =
+            CacheBuilder.newBuilder()
+                    .expireAfterAccess(CACHE_EXPIRE_DURATION)
+                    .maximumSize(1024)
+                    .build();
+
     private Selectors() {}
 
     /**
@@ -71,8 +83,20 @@ public class Selectors {
         }
     }
 
-    /** Match the {@link TableId} against the {@link Selector}s. * */
+    /** Match the {@link TableId} against the {@link Selector}s. */
     public boolean isMatch(TableId tableId) {
+        Boolean cachedResult = cache.getIfPresent(tableId);
+        if (cachedResult != null) {
+            return cachedResult;
+        }
+
+        boolean match = computeIsMatch(tableId);
+        cache.put(tableId, match);
+        return match;
+    }
+
+    /** Computes the match result if not present in the cache. */
+    private boolean computeIsMatch(TableId tableId) {
         for (Selector selector : selectors) {
             if (selector.isMatch(tableId)) {
                 return true;
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index ff3478cb4..6e12e79d5 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -43,6 +43,7 @@ limitations under the License.
         <iceberg.version>1.6.1</iceberg.version>
         <hive.version>2.3.9</hive.version>
         <fluss.version>0.7.0</fluss.version>
+        <jmh.version>1.37</jmh.version>
         <hudi.version>1.1.0</hudi.version>
     </properties>
 
@@ -236,6 +237,21 @@ limitations under the License.
             <scope>test</scope>
         </dependency>
 
+        <!-- benchmark -->
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-core</artifactId>
+            <version>${jmh.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-generator-annprocess</artifactId>
+            <version>${jmh.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <!-- test dependencies on TestContainers -->
         <dependency>
             <groupId>org.testcontainers</groupId>
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/benchmark/SelectorsBenchmark.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/benchmark/SelectorsBenchmark.java
new file mode 100644
index 000000000..4dfa2790b
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/benchmark/SelectorsBenchmark.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.benchmark;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Selectors;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Benchmark for table selector performance with and without cache.
+ *
+ * <pre>
+ * Benchmark                                                   Mode  Cnt     
Score     Error   Units
+ * SelectorsBenchmark.testSelectorWithCache                   thrpt   20  
1028.979 ± 218.663  ops/ms
+ * SelectorsBenchmark.testSelectorWithoutCache                thrpt   20   
136.747 ±  11.872  ops/ms
+ * </pre>
+ */
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 10, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(2)
+@Threads(2)
+public class SelectorsBenchmark {
+
+    private Selectors selectors;
+    private List<TableId> queryTableIds;
+
+    @Setup(Level.Trial)
+    public void setup() {
+        selectors =
+                new Selectors.SelectorsBuilder()
+                        .includeTables(
+                                
"test_wms_inventory_[a-z]+.inventory_batch_detail,"
+                                        + 
"test_wms_inventory_[a-z]+.inventory_batch_detail_record,"
+                                        + 
"test_wms_inventory_[a-z]+.inventory_batch_input,"
+                                        + 
"test_wms_inventory_[a-z]+.inventory_flow_volume_level,"
+                                        + 
"test_wms_inventory_[a-z]+.inventory_snapshot,"
+                                        + 
"test_wms_log_[a-z]+.log_common_log_[a-z]+")
+                        .build();
+
+        queryTableIds =
+                ImmutableList.of(
+                        TableId.tableId(
+                                
"test_wms_common_europe.occupy_strategy_exe_progress_order"),
+                        
TableId.tableId("test_wms_common_europe.wave_strategy_rule_relation"),
+                        TableId.tableId("db.sc2.A1"),
+                        TableId.tableId("db2.sc2.A1"),
+                        
TableId.tableId("test_wms_output_s.out_moment_storage_location_relation"),
+                        
TableId.tableId("test_wms_output_a.out_moment_storage_location_relation"));
+
+        // warm up cache
+        for (TableId id : queryTableIds) {
+            selectors.isMatch(id);
+        }
+    }
+
+    /**
+     * Benchmark to evaluate the performance of table selector with caching 
enabled.
+     *
+     * <p>This benchmark measures throughput when using a pre-built {@link 
Selectors} instance that
+     * leverages internal caching mechanisms. This simulates a typical usage 
scenario where selector
+     * rules are initialized once and reused across multiple queries.
+     *
+     * <p>Expected to perform significantly better than non-cached version due 
to avoidance of
+     * repeated regex parsing and compilation.
+     */
+    @Benchmark
+    public void testSelectorWithCache() {
+        for (TableId id : queryTableIds) {
+            selectors.isMatch(id);
+        }
+    }
+
+    /**
+     * Benchmark to evaluate the performance of table selector without using 
cache.
+     *
+     * <p>This benchmark constructs a new {@link Selectors} instance for each 
invocation, simulating
+     * a cold-start or ad-hoc usage scenario. The overhead includes pattern 
parsing and matcher
+     * construction, which significantly impacts throughput.
+     *
+     * <p>Useful for understanding worst-case performance and comparing 
against the cached version.
+     */
+    @Benchmark
+    public void testSelectorWithoutCache() {
+        Selectors freshSelectors =
+                new Selectors.SelectorsBuilder()
+                        .includeTables(
+                                
"test_wms_inventory_[a-z]+.inventory_batch_detail,"
+                                        + 
"test_wms_inventory_[a-z]+.inventory_batch_detail_record,"
+                                        + 
"test_wms_inventory_[a-z]+.inventory_batch_input,"
+                                        + 
"test_wms_inventory_[a-z]+.inventory_flow_volume_level,"
+                                        + 
"test_wms_inventory_[a-z]+.inventory_snapshot,"
+                                        + 
"test_wms_log_[a-z]+.log_common_log_[a-z]+")
+                        .build();
+        for (TableId id : queryTableIds) {
+            freshSelectors.isMatch(id);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options =
+                new OptionsBuilder()
+                        .include(SelectorsBenchmark.class.getSimpleName())
+                        .detectJvmArgs()
+                        .build();
+        new Runner(options).run();
+    }
+}

Reply via email to