This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new b598054 [FLINK-34215] FLIP-377: Support fine-grained configuration to
control filter push down for JDBC Connector
b598054 is described below
commit b598054c9cb5cc0174993058a53990e5d7d1827b
Author: Jiabao Sun <[email protected]>
AuthorDate: Thu Jan 25 11:41:54 2024 +0800
[FLINK-34215] FLIP-377: Support fine-grained configuration to control
filter push down for JDBC Connector
---
docs/content.zh/docs/connectors/table/jdbc.md | 12 +++++
docs/content/docs/connectors/table/jdbc.md | 16 ++++++-
.../jdbc/core/table/FilterHandlingPolicy.java | 33 ++++++++++++++
.../jdbc/core/table/JdbcConnectorOptions.java | 7 +++
.../jdbc/core/table/JdbcDynamicTableFactory.java | 3 ++
.../core/table/source/JdbcDynamicTableSource.java | 51 +++++++++++++++-------
.../core/table/JdbcDynamicTableFactoryTest.java | 6 +++
.../jdbc/core/table/JdbcTablePlanTest.java | 9 ++++
.../table/source/JdbcDynamicTableSourceITCase.java | 9 ++--
.../jdbc/core/table/JdbcTablePlanTest.xml | 18 ++++++++
10 files changed, 144 insertions(+), 20 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/jdbc.md
b/docs/content.zh/docs/connectors/table/jdbc.md
index 3c04dbe..fdb9326 100644
--- a/docs/content.zh/docs/connectors/table/jdbc.md
+++ b/docs/content.zh/docs/connectors/table/jdbc.md
@@ -258,6 +258,18 @@ ON myTopic.key = MyUserTable.id;
<td>Integer</td>
<td>查询数据库失败的最大重试次数。</td>
</tr>
+ <tr>
+ <td><h5>filter.handling.policy</h5></td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">always</td>
+ <td>枚举值,可选项: always, never</td>
+ <td>过滤器下推策略,支持的策略有:
+ <ul>
+ <li><code>always</code>: 始终将支持的过滤器下推到数据库.</li>
+ <li><code>never</code>: 不将任何过滤器下推到数据库.</li>
+ </ul>
+ </td>
+ </tr>
<tr>
<td><h5>sink.buffer-flush.max-rows</h5></td>
<td>可选</td>
diff --git a/docs/content/docs/connectors/table/jdbc.md
b/docs/content/docs/connectors/table/jdbc.md
index 258e996..b0ef84e 100644
--- a/docs/content/docs/connectors/table/jdbc.md
+++ b/docs/content/docs/connectors/table/jdbc.md
@@ -274,6 +274,20 @@ Connector Options
<td>Integer</td>
<td>The max retry times if lookup database failed.</td>
</tr>
+ <tr>
+ <td><h5>filter.handling.policy</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">always</td>
+ <td>Enum Possible values: always, never</td>
+ <td>Fine-grained configuration to control filter push down.
+ Supported policies are:
+ <ul>
+ <li><code>always</code>: Always push the supported filters to
database.</li>
+ <li><code>never</code>: Never push any filters to database.</li>
+ </ul>
+ </td>
+ </tr>
<tr>
<td><h5>sink.buffer-flush.max-rows</h5></td>
<td>optional</td>
@@ -305,7 +319,7 @@ Connector Options
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Defines the parallelism of the JDBC sink operator. By default, the
parallelism is determined by the framework using the same parallelism of the
upstream chained operator.</td>
- </tr>
+ </tr>
</tbody>
</table>
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/FilterHandlingPolicy.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/FilterHandlingPolicy.java
new file mode 100644
index 0000000..7fe72f9
--- /dev/null
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/FilterHandlingPolicy.java
@@ -0,0 +1,33 @@
+package org.apache.flink.connector.jdbc.core.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/** Fine-grained configuration to control filter push down for jdbc Table/SQL
source. */
+@PublicEvolving
+public enum FilterHandlingPolicy implements DescribedEnum {
+ ALWAYS("always", text("Always push the supported filters to database.")),
+
+ NEVER("never", text("Never push any filters to database."));
+
+ private final String name;
+ private final InlineElement description;
+
+ FilterHandlingPolicy(String name, InlineElement description) {
+ this.name = name;
+ this.description = description;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return description;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
index 35c6023..37d5362 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcConnectorOptions.java
@@ -179,5 +179,12 @@ public class JdbcConnectorOptions {
.defaultValue(3)
.withDescription("The max retry times if writing records
to database failed.");
+ public static final ConfigOption<FilterHandlingPolicy>
FILTER_HANDLING_POLICY =
+ ConfigOptions.key("filter.handling.policy")
+ .enumType(FilterHandlingPolicy.class)
+ .defaultValue(FilterHandlingPolicy.ALWAYS)
+ .withDescription(
+ "Fine-grained configuration to control filter push
down for jdbc Table/SQL source.");
+
protected JdbcConnectorOptions() {}
}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
index 56a0dca..e1c732e 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java
@@ -54,6 +54,7 @@ import java.util.stream.Stream;
import static
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.COMPATIBLE_MODE;
import static
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.DRIVER;
+import static
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.FILTER_HANDLING_POLICY;
import static
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
import static
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.LOOKUP_CACHE_MISSING_KEY;
import static
org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions.LOOKUP_CACHE_TTL;
@@ -128,6 +129,7 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
getJdbcReadOptions(helper.getOptions()),
helper.getOptions().get(LookupOptions.MAX_RETRIES),
getLookupCache(config),
+ helper.getOptions().get(FILTER_HANDLING_POLICY),
context.getPhysicalRowDataType());
}
@@ -262,6 +264,7 @@ public class JdbcDynamicTableFactory implements
DynamicTableSourceFactory, Dynam
optionalOptions.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
optionalOptions.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
optionalOptions.add(LookupOptions.MAX_RETRIES);
+ optionalOptions.add(FILTER_HANDLING_POLICY);
return optionalOptions;
}
diff --git
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
index 11bdd22..d26898a 100644
---
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
+++
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc.core.table.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.core.table.FilterHandlingPolicy;
import
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
import
org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvider;
@@ -53,6 +54,7 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -72,6 +74,7 @@ public class JdbcDynamicTableSource
private final JdbcReadOptions readOptions;
private final int lookupMaxRetryTimes;
@Nullable private final LookupCache cache;
+ private final FilterHandlingPolicy filterHandlingPolicy;
private DataType physicalRowDataType;
private final String dialectName;
private long limit = -1;
@@ -83,11 +86,13 @@ public class JdbcDynamicTableSource
JdbcReadOptions readOptions,
int lookupMaxRetryTimes,
@Nullable LookupCache cache,
+ FilterHandlingPolicy filterHandlingPolicy,
DataType physicalRowDataType) {
this.options = options;
this.readOptions = readOptions;
this.lookupMaxRetryTimes = lookupMaxRetryTimes;
this.cache = cache;
+ this.filterHandlingPolicy = filterHandlingPolicy;
this.physicalRowDataType = physicalRowDataType;
this.dialectName = options.getDialect().dialectName();
}
@@ -208,7 +213,12 @@ public class JdbcDynamicTableSource
public DynamicTableSource copy() {
JdbcDynamicTableSource newSource =
new JdbcDynamicTableSource(
- options, readOptions, lookupMaxRetryTimes, cache,
physicalRowDataType);
+ options,
+ readOptions,
+ lookupMaxRetryTimes,
+ cache,
+ filterHandlingPolicy,
+ physicalRowDataType);
newSource.resolvedPredicates = new
ArrayList<>(this.resolvedPredicates);
newSource.pushdownParams = Arrays.copyOf(this.pushdownParams,
this.pushdownParams.length);
return newSource;
@@ -231,6 +241,7 @@ public class JdbcDynamicTableSource
return Objects.equals(options, that.options)
&& Objects.equals(readOptions, that.readOptions)
&& Objects.equals(lookupMaxRetryTimes,
that.lookupMaxRetryTimes)
+ && Objects.equals(filterHandlingPolicy,
that.filterHandlingPolicy)
&& Objects.equals(cache, that.cache)
&& Objects.equals(physicalRowDataType,
that.physicalRowDataType)
&& Objects.equals(dialectName, that.dialectName)
@@ -246,6 +257,7 @@ public class JdbcDynamicTableSource
readOptions,
lookupMaxRetryTimes,
cache,
+ filterHandlingPolicy,
physicalRowDataType,
dialectName,
limit,
@@ -260,22 +272,29 @@ public class JdbcDynamicTableSource
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
- List<ResolvedExpression> acceptedFilters = new ArrayList<>();
- List<ResolvedExpression> remainingFilters = new ArrayList<>();
-
- for (ResolvedExpression filter : filters) {
- Optional<ParameterizedPredicate> simplePredicate =
parseFilterToPredicate(filter);
- if (simplePredicate.isPresent()) {
- acceptedFilters.add(filter);
- ParameterizedPredicate pred = simplePredicate.get();
- this.pushdownParams = ArrayUtils.addAll(this.pushdownParams,
pred.getParameters());
- this.resolvedPredicates.add(pred.getPredicate());
- } else {
- remainingFilters.add(filter);
- }
+ switch (filterHandlingPolicy) {
+ case NEVER:
+ return Result.of(Collections.emptyList(), filters);
+ case ALWAYS:
+ default:
+ List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+ List<ResolvedExpression> remainingFilters = new ArrayList<>();
+
+ for (ResolvedExpression filter : filters) {
+ Optional<ParameterizedPredicate> simplePredicate =
+ parseFilterToPredicate(filter);
+ if (simplePredicate.isPresent()) {
+ acceptedFilters.add(filter);
+ ParameterizedPredicate pred = simplePredicate.get();
+ this.pushdownParams =
+ ArrayUtils.addAll(this.pushdownParams,
pred.getParameters());
+ this.resolvedPredicates.add(pred.getPredicate());
+ } else {
+ remainingFilters.add(filter);
+ }
+ }
+ return Result.of(acceptedFilters, remainingFilters);
}
-
- return Result.of(acceptedFilters, remainingFilters);
}
private Optional<ParameterizedPredicate>
parseFilterToPredicate(ResolvedExpression filter) {
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
index e9bf702..4e799a6 100644
---
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactoryTest.java
@@ -71,6 +71,7 @@ class JdbcDynamicTableFactoryTest {
properties.put("username", "user");
properties.put("password", "pass");
properties.put("connection.max-retry-timeout", "120s");
+ properties.put("filter.handling.policy", "never");
// validation for source
DynamicTableSource actualSource = createTableSource(SCHEMA,
properties);
@@ -89,6 +90,7 @@ class JdbcDynamicTableFactoryTest {
JdbcReadOptions.builder().build(),
LookupOptions.MAX_RETRIES.defaultValue(),
null,
+ FilterHandlingPolicy.NEVER,
SCHEMA.toPhysicalRowDataType());
assertThat(actualSource).isEqualTo(expectedSource);
@@ -146,6 +148,7 @@ class JdbcDynamicTableFactoryTest {
readOptions,
LookupOptions.MAX_RETRIES.defaultValue(),
null,
+
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
SCHEMA.toPhysicalRowDataType());
assertThat(actual).isEqualTo(expected);
@@ -174,6 +177,7 @@ class JdbcDynamicTableFactoryTest {
JdbcReadOptions.builder().build(),
10,
DefaultLookupCache.fromConfig(Configuration.fromMap(properties)),
+
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
SCHEMA.toPhysicalRowDataType());
assertThat(actual).isEqualTo(expected);
@@ -202,6 +206,7 @@ class JdbcDynamicTableFactoryTest {
.maximumSize(1000L)
.expireAfterWrite(Duration.ofSeconds(10))
.build(),
+
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
SCHEMA.toPhysicalRowDataType());
assertThat(actual).isEqualTo(expected);
@@ -387,6 +392,7 @@ class JdbcDynamicTableFactoryTest {
.maximumSize(1000L)
.expireAfterWrite(Duration.ofSeconds(10))
.build(),
+
JdbcConnectorOptions.FILTER_HANDLING_POLICY.defaultValue(),
SCHEMA.toPhysicalRowDataType());
assertThat(actual).isEqualTo(expected);
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.java
index ad4558d..0e58700 100644
---
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.java
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.java
@@ -52,6 +52,9 @@ class JdbcTablePlanTest extends TableTestBase {
+ " 'url'='jdbc:derby:memory:test',"
+ " 'table-name'='test_table'"
+ ")");
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE jdbc_never_pushdown WITH
('filter.handling.policy' = 'never') LIKE jdbc;");
util.tableEnv()
.executeSql(
"CREATE TABLE d ( "
@@ -98,6 +101,12 @@ class JdbcTablePlanTest extends TableTestBase {
"SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
}
+ @Test
+ void testNeverFilterPushdown() {
+ util.verifyExecPlan(
+ "SELECT id, time_col, real_col FROM jdbc_never_pushdown WHERE
id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23");
+ }
+
/**
* Note the join condition is not present in the optimized plan, see
FLINK-34170, as it is
* handled in the JDBC java code, where it adds the join conditions to the
select statement
diff --git
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSourceITCase.java
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSourceITCase.java
index b05a038..083ec9e 100644
---
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSourceITCase.java
+++
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSourceITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.connector.jdbc.core.table.source;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.core.table.FilterHandlingPolicy;
import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
import org.apache.flink.connector.jdbc.testutils.TableManaged;
import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
@@ -195,8 +196,9 @@ public abstract class JdbcDynamicTableSourceITCase
implements DatabaseTest {
.containsAll(collected);
}
- @Test
- public void testFilter() {
+ @ParameterizedTest
+ @EnumSource(FilterHandlingPolicy.class)
+ void testFilter(FilterHandlingPolicy filterHandlingPolicy) {
String testTable = "testTable";
tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(),
testTable));
@@ -210,7 +212,8 @@ public abstract class JdbcDynamicTableSourceITCase
implements DatabaseTest {
"'scan.partition.column'='id'",
"'scan.partition.num'='1'",
"'scan.partition.lower-bound'='1'",
- "'scan.partition.upper-bound'='1'")));
+ "'scan.partition.upper-bound'='1'",
+ "'filter.handling.policy'='" +
filterHandlingPolicy.name() + "'")));
// we create a VIEW here to test column remapping, ie. would filter
push down work if we
// create a view that depends on our source table
diff --git
a/flink-connector-jdbc-core/src/test/resources/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.xml
b/flink-connector-jdbc-core/src/test/resources/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.xml
index f05f5fb..5a6e746 100644
---
a/flink-connector-jdbc-core/src/test/resources/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.xml
+++
b/flink-connector-jdbc-core/src/test/resources/org/apache/flink/connector/jdbc/core/table/JdbcTablePlanTest.xml
@@ -65,6 +65,24 @@ LogicalProject(id=[$0], time_col=[$3], real_col=[$4])
<Resource name="optimized exec plan">
<![CDATA[
TableSourceScan(table=[[default_catalog, default_database, jdbc,
filter=[and(OR(=(id, 900001:BIGINT), >=(double_col, -1000.23:DECIMAL(6, 2))),
OR(<>(time_col, 11:11:11), >=(double_col, -1000.23:DECIMAL(6, 2))))],
project=[id, time_col, real_col]]], fields=[id, time_col, real_col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNeverFilterPushdown">
+ <Resource name="sql">
+ <![CDATA[SELECT id, time_col, real_col FROM jdbc_never_pushdown
WHERE id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], time_col=[$3], real_col=[$4])
++- LogicalFilter(condition=[OR(AND(=($0, 900001), <>($3, 11:11:11)), >=($5,
-1000.23:DECIMAL(6, 2)))])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
jdbc_never_pushdown]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[id, time_col, real_col], where=[(((id = 900001) OR (double_col >=
-1000.23)) AND ((time_col <> 11:11:11) OR (double_col >= -1000.23)))])
++- TableSourceScan(table=[[default_catalog, default_database,
jdbc_never_pushdown, filter=[], project=[id, time_col, real_col, double_col]]],
fields=[id, time_col, real_col, double_col])
]]>
</Resource>
</TestCase>