This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new b598054  [FLINK-34215] FLIP-377: Support fine-grained configuration to 
control filter push down for JDBC Connector
b598054 is described below

commit b598054c9cb5cc0174993058a53990e5d7d1827b
Author: Jiabao Sun <[email protected]>
AuthorDate: Thu Jan 25 11:41:54 2024 +0800

    [FLINK-34215] FLIP-377: Support fine-grained configuration to control 
filter push down for JDBC Connector
---
 docs/content.zh/docs/connectors/table/jdbc.md      | 12 +++++
 docs/content/docs/connectors/table/jdbc.md         | 16 ++++++-
 .../jdbc/core/table/FilterHandlingPolicy.java      | 33 ++++++++++++++
 .../jdbc/core/table/JdbcConnectorOptions.java      |  7 +++
 .../jdbc/core/table/JdbcDynamicTableFactory.java   |  3 ++
 .../core/table/source/JdbcDynamicTableSource.java  | 51 +++++++++++++++-------
 .../core/table/JdbcDynamicTableFactoryTest.java    |  6 +++
 .../jdbc/core/table/JdbcTablePlanTest.java         |  9 ++++
 .../table/source/JdbcDynamicTableSourceITCase.java |  9 ++--
 .../jdbc/core/table/JdbcTablePlanTest.xml          | 18 ++++++++
 10 files changed, 144 insertions(+), 20 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/jdbc.md 
b/docs/content.zh/docs/connectors/table/jdbc.md
index 3c04dbe..fdb9326 100644
--- a/docs/content.zh/docs/connectors/table/jdbc.md
+++ b/docs/content.zh/docs/connectors/table/jdbc.md
@@ -258,6 +258,18 @@ ON myTopic.key = MyUserTable.id;
       <td>Integer</td>
       <td>查询数据库失败的最大重试次数。</td>
     </tr>
+    <tr>
+      <td><h5>filter.handling.policy</h5></td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">always</td>
+      <td>枚举值,可选项: always, never</td>
+      <td>过滤器下推策略,支持的策略有:
+          <ul>
+            <li><code>always</code>: 始终将支持的过滤器下推到数据库.</li>
+            <li><code>never</code>: 不将任何过滤器下推到数据库.</li>
+          </ul>
+      </td>
+    </tr>
     <tr>
       <td><h5>sink.buffer-flush.max-rows</h5></td>
       <td>可选</td>
diff --git a/docs/content/docs/connectors/table/jdbc.md 
b/docs/content/docs/connectors/table/jdbc.md
index 258e996..b0ef84e 100644
--- a/docs/content/docs/connectors/table/jdbc.md
+++ b/docs/content/docs/connectors/table/jdbc.md
@@ -274,6 +274,20 @@ Connector Options
       <td>Integer</td>
       <td>The max retry times if lookup database failed.</td>
     </tr>
+    <tr>
+      <td><h5>filter.handling.policy</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">always</td>
+      <td>Enum Possible values: always, never</td>
+      <td>Fine-grained configuration to control filter push down. 
+          Supported policies are:
+          <ul>
+            <li><code>always</code>: Always push the supported filters to 
database.</li>
+            <li><code>never</code>: Never push any filters to database.</li>
+          </ul>
+      </td>
+    </tr>
     <tr>
       <td><h5>sink.buffer-flush.max-rows</h5></td>
       <td>optional</td>
@@ -305,7 +319,7 @@ Connector Options
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>Defines the parallelism of the JDBC sink operator. By default, the 
parallelism is determined by the framework using the same parallelism of the 
upstream chained operator.</td>
-    </tr>          
+    </tr>
     </tbody>
 </table>
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/FilterHandlingPolicy.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/FilterHandlingPolicy.java
new file mode 100644
index 0000000..7fe72f9
--- /dev/null
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/FilterHandlingPolicy.java
@@ -0,0 +1,33 @@
+package org.apache.flink.connector.jdbc.core.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/** Fine-grained configuration to control filter push down for jdbc Table/SQL 
source. */
+@PublicEvolving
+public enum FilterHandlingPolicy implements DescribedEnum {
+    ALWAYS("always", text("Always push the supported filters to database.")),
+
+    NEVER("never", text("Never push any filters to database."));
+
+    private final String name;
+    private final InlineElement description;
+
+    FilterHandlingPolicy(String name, InlineElement description) {
+        this.name = name;
+        this.description = description;
+    }
+
+    @Override
+    public InlineElement getDescription() {
+        return description;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+}
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
index 35c6023..37d5362 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
@@ -179,5 +179,12 @@ public class JdbcConnectorOptions {
                     .defaultValue(3)
                     .withDescription("The max retry times if writing records 
to database failed.");
 
+    public static final ConfigOption<FilterHandlingPolicy> 
FILTER_HANDLING_POLICY =
+            ConfigOptions.key("filter.handling.policy")
+                    .enumType(FilterHandlingPolicy.class)
+                    .defaultValue(FilterHandlingPolicy.ALWAYS)
+                    .withDescription(
+                            "Fine-grained configuration to control filter push 
down for jdbc Table/SQL source.");
+
     protected JdbcConnectorOptions() {}
 }
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
index 56a0dca..e1c732e 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
@@ -54,6 +54,7 @@ import java.util.stream.Stream;
 
 import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.COMPATIBLE_MODE;
 import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.DRIVER;
+import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.FILTER_HANDLING_POLICY;
 import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
 import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.LOOKUP_CACHE_MISSING_KEY;
 import static 
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.LOOKUP_CACHE_TTL;
@@ -128,6 +129,7 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                 getJdbcReadOptions(helper.getOptions()),
                 helper.getOptions().get(LookupOptions.MAX_RETRIES),
                 getLookupCache(config),
+                helper.getOptions().get(FILTER_HANDLING_POLICY),
                 context.getPhysicalRowDataType());
     }
 
@@ -262,6 +264,7 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
         optionalOptions.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
         optionalOptions.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
         optionalOptions.add(LookupOptions.MAX_RETRIES);
+        optionalOptions.add(FILTER_HANDLING_POLICY);
         return optionalOptions;
     }
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
index 11bdd22..d26898a 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc.core.table.source;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.core.table.FilterHandlingPolicy;
 import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
 import 
org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvider;
@@ -53,6 +54,7 @@ import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -72,6 +74,7 @@ public class JdbcDynamicTableSource
     private final JdbcReadOptions readOptions;
     private final int lookupMaxRetryTimes;
     @Nullable private final LookupCache cache;
+    private final FilterHandlingPolicy filterHandlingPolicy;
     private DataType physicalRowDataType;
     private final String dialectName;
     private long limit = -1;
@@ -83,11 +86,13 @@ public class JdbcDynamicTableSource
             JdbcReadOptions readOptions,
             int lookupMaxRetryTimes,
             @Nullable LookupCache cache,
+            FilterHandlingPolicy filterHandlingPolicy,
             DataType physicalRowDataType) {
         this.options = options;
         this.readOptions = readOptions;
         this.lookupMaxRetryTimes = lookupMaxRetryTimes;
         this.cache = cache;
+        this.filterHandlingPolicy = filterHandlingPolicy;
         this.physicalRowDataType = physicalRowDataType;
         this.dialectName = options.getDialect().dialectName();
     }
@@ -208,7 +213,12 @@ public class JdbcDynamicTableSource
     public DynamicTableSource copy() {
         JdbcDynamicTableSource newSource =
                 new JdbcDynamicTableSource(
-                        options, readOptions, lookupMaxRetryTimes, cache, 
physicalRowDataType);
+                        options,
+                        readOptions,
+                        lookupMaxRetryTimes,
+                        cache,
+                        filterHandlingPolicy,
+                        physicalRowDataType);
         newSource.resolvedPredicates = new 
ArrayList<>(this.resolvedPredicates);
         newSource.pushdownParams = Arrays.copyOf(this.pushdownParams, 
this.pushdownParams.length);
         return newSource;
@@ -231,6 +241,7 @@ public class JdbcDynamicTableSource
         return Objects.equals(options, that.options)
                 && Objects.equals(readOptions, that.readOptions)
                 && Objects.equals(lookupMaxRetryTimes, 
that.lookupMaxRetryTimes)
+                && Objects.equals(filterHandlingPolicy, 
that.filterHandlingPolicy)
                 && Objects.equals(cache, that.cache)
                 && Objects.equals(physicalRowDataType, 
that.physicalRowDataType)
                 && Objects.equals(dialectName, that.dialectName)
@@ -246,6 +257,7 @@ public class JdbcDynamicTableSource
                 readOptions,
                 lookupMaxRetryTimes,
                 cache,
+                filterHandlingPolicy,
                 physicalRowDataType,
                 dialectName,
                 limit,
@@ -260,22 +272,29 @@ public class JdbcDynamicTableSource
 
     @Override
     public Result applyFilters(List<ResolvedExpression> filters) {
-        List<ResolvedExpression> acceptedFilters = new ArrayList<>();
-        List<ResolvedExpression> remainingFilters = new ArrayList<>();
-
-        for (ResolvedExpression filter : filters) {
-            Optional<ParameterizedPredicate> simplePredicate = 
parseFilterToPredicate(filter);
-            if (simplePredicate.isPresent()) {
-                acceptedFilters.add(filter);
-                ParameterizedPredicate pred = simplePredicate.get();
-                this.pushdownParams = ArrayUtils.addAll(this.pushdownParams, 
pred.getParameters());
-                this.resolvedPredicates.add(pred.getPredicate());
-            } else {
-                remainingFilters.add(filter);
-            }
+        switch (filterHandlingPolicy) {
+            case NEVER:
+                return Result.of(Collections.emptyList(), filters);
+            case ALWAYS:
+            default:
+                List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+                List<ResolvedExpression> remainingFilters = new ArrayList<>();
+
+                for (ResolvedExpression filter : filters) {
+                    Optional<ParameterizedPredicate> simplePredicate =
+                            parseFilterToPredicate(filter);
+                    if (simplePredicate.isPresent()) {
+                        acceptedFilters.add(filter);
+                        ParameterizedPredicate pred = simplePredicate.get();
+                        this.pushdownParams =
+                                ArrayUtils.addAll(this.pushdownParams, 
pred.getParameters());
+                        this.resolvedPredicates.add(pred.getPredicate());
+                    } else {
+                        remainingFilters.add(filter);
+                    }
+                }
+                return Result.of(acceptedFilters, remainingFilters);
         }
-
-        return Result.of(acceptedFilters, remainingFilters);
     }
 
     private Optional<ParameterizedPredicate> 
parseFilterToPredicate(ResolvedExpression filter) {
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
index e9bf702..4e799a6 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
@@ -71,6 +71,7 @@ class JdbcDynamicTableFactoryTest {
         properties.put("username", "user");
         properties.put("password", "pass");
         properties.put("connection.max-retry-timeout", "120s");
+        properties.put("filter.handling.policy", "never");
 
         // validation for source
         DynamicTableSource actualSource = createTableSource(SCHEMA, 
properties);
@@ -89,6 +90,7 @@ class JdbcDynamicTableFactoryTest {
                         JdbcReadOptions.builder().build(),
                         LookupOptions.MAX_RETRIES.defaultValue(),
                         null,
+                        FilterHandlingPolicy.NEVER,
                         SCHEMA.toPhysicalRowDataType());
         assertThat(actualSource).isEqualTo(expectedSource);
 
@@ -146,6 +148,7 @@ class JdbcDynamicTableFactoryTest {
                         readOptions,
                         LookupOptions.MAX_RETRIES.defaultValue(),
                         null,
+                        
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
                         SCHEMA.toPhysicalRowDataType());
 
         assertThat(actual).isEqualTo(expected);
@@ -174,6 +177,7 @@ class JdbcDynamicTableFactoryTest {
                         JdbcReadOptions.builder().build(),
                         10,
                         
DefaultLookupCache.fromConfig(Configuration.fromMap(properties)),
+                        
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
                         SCHEMA.toPhysicalRowDataType());
 
         assertThat(actual).isEqualTo(expected);
@@ -202,6 +206,7 @@ class JdbcDynamicTableFactoryTest {
                                 .maximumSize(1000L)
                                 .expireAfterWrite(Duration.ofSeconds(10))
                                 .build(),
+                        
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
                         SCHEMA.toPhysicalRowDataType());
 
         assertThat(actual).isEqualTo(expected);
@@ -387,6 +392,7 @@ class JdbcDynamicTableFactoryTest {
                                 .maximumSize(1000L)
                                 .expireAfterWrite(Duration.ofSeconds(10))
                                 .build(),
+                        
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
                         SCHEMA.toPhysicalRowDataType());
 
         assertThat(actual).isEqualTo(expected);
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.java
index ad4558d..0e58700 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.java
@@ -52,6 +52,9 @@ class JdbcTablePlanTest extends TableTestBase {
                                 + "  'url'='jdbc:derby:memory:test',"
                                 + "  'table-name'='test_table'"
                                 + ")");
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE jdbc_never_pushdown WITH 
('filter.handling.policy' = 'never') LIKE jdbc;");
         util.tableEnv()
                 .executeSql(
                         "CREATE TABLE  d ( "
@@ -98,6 +101,12 @@ class JdbcTablePlanTest extends TableTestBase {
                 "SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
     }
 
+    @Test
+    void testNeverFilterPushdown() {
+        util.verifyExecPlan(
+                "SELECT id, time_col, real_col FROM jdbc_never_pushdown WHERE 
id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23");
+    }
+
     /**
      * Note the join condition is not present in the optimized plan, see 
FLINK-34170, as it is
      * handled in the JDBC java code, where it adds the join conditions to the 
select statement
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSourceITCase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSourceITCase.java
index b05a038..083ec9e 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSourceITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.jdbc.core.table.source;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.core.table.FilterHandlingPolicy;
 import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
 import org.apache.flink.connector.jdbc.testutils.TableManaged;
 import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
@@ -195,8 +196,9 @@ public abstract class JdbcDynamicTableSourceITCase 
implements DatabaseTest {
                 .containsAll(collected);
     }
 
-    @Test
-    public void testFilter() {
+    @ParameterizedTest
+    @EnumSource(FilterHandlingPolicy.class)
+    void testFilter(FilterHandlingPolicy filterHandlingPolicy) {
         String testTable = "testTable";
         tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), 
testTable));
 
@@ -210,7 +212,8 @@ public abstract class JdbcDynamicTableSourceITCase 
implements DatabaseTest {
                                 "'scan.partition.column'='id'",
                                 "'scan.partition.num'='1'",
                                 "'scan.partition.lower-bound'='1'",
-                                "'scan.partition.upper-bound'='1'")));
+                                "'scan.partition.upper-bound'='1'",
+                                "'filter.handling.policy'='" + 
filterHandlingPolicy.name() + "'")));
 
         // we create a VIEW here to test column remapping, ie. would filter 
push down work if we
         // create a view that depends on our source table
diff --git 
a/flink-connector-jdbc-core/src/test/resources/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.xml
 
b/flink-connector-jdbc-core/src/test/resources/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.xml
index f05f5fb..5a6e746 100644
--- 
a/flink-connector-jdbc-core/src/test/resources/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.xml
+++ 
b/flink-connector-jdbc-core/src/test/resources/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.xml
@@ -65,6 +65,24 @@ LogicalProject(id=[$0], time_col=[$3], real_col=[$4])
         <Resource name="optimized exec plan">
             <![CDATA[
 TableSourceScan(table=[[default_catalog, default_database, jdbc, 
filter=[and(OR(=(id, 900001:BIGINT), >=(double_col, -1000.23:DECIMAL(6, 2))), 
OR(<>(time_col, 11:11:11), >=(double_col, -1000.23:DECIMAL(6, 2))))], 
project=[id, time_col, real_col]]], fields=[id, time_col, real_col])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testNeverFilterPushdown">
+        <Resource name="sql">
+            <![CDATA[SELECT id, time_col, real_col FROM jdbc_never_pushdown 
WHERE id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23]]>
+        </Resource>
+        <Resource name="ast">
+            <![CDATA[
+LogicalProject(id=[$0], time_col=[$3], real_col=[$4])
++- LogicalFilter(condition=[OR(AND(=($0, 900001), <>($3, 11:11:11)), >=($5, 
-1000.23:DECIMAL(6, 2)))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
jdbc_never_pushdown]])
+]]>
+        </Resource>
+        <Resource name="optimized exec plan">
+            <![CDATA[
+Calc(select=[id, time_col, real_col], where=[(((id = 900001) OR (double_col >= 
-1000.23)) AND ((time_col <> 11:11:11) OR (double_col >= -1000.23)))])
++- TableSourceScan(table=[[default_catalog, default_database, 
jdbc_never_pushdown, filter=[], project=[id, time_col, real_col, double_col]]], 
fields=[id, time_col, real_col, double_col])
 ]]>
         </Resource>
     </TestCase>

Reply via email to