This is an automated email from the ASF dual-hosted git repository.
jiabaosun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git
The following commit(s) were added to refs/heads/main by this push:
new 424d57a [FLINK-34216][connectors/mongodb] FLIP-377: Support
fine-grained configuration to control filter push down for MongoDB Connector
(#23)
424d57a is described below
commit 424d57a54c69d9d0e0fde4219e46bf5fcf457b59
Author: Jiabao Sun <[email protected]>
AuthorDate: Tue Aug 6 09:41:39 2024 +0800
[FLINK-34216][connectors/mongodb] FLIP-377: Support fine-grained
configuration to control filter push down for MongoDB Connector (#23)
---
docs/content.zh/docs/connectors/table/mongodb.md | 13 ++++
docs/content/docs/connectors/table/mongodb.md | 14 +++++
.../mongodb/table/FilterHandlingPolicy.java | 33 ++++++++++
.../mongodb/table/MongoConnectorOptions.java | 7 +++
.../mongodb/table/MongoDynamicTableFactory.java | 3 +
.../mongodb/table/MongoDynamicTableSource.java | 57 ++++++++++-------
.../mongodb/table/config/MongoConfiguration.java | 6 ++
.../table/MongoDynamicTableFactoryTest.java | 5 ++
.../table/MongoDynamicTableSourceITCase.java | 11 +++-
.../mongodb/table/MongoTablePlanTest.java | 71 +++++++++++++++-------
.../connector/mongodb/table/MongoTablePlanTest.xml | 18 ++++++
11 files changed, 193 insertions(+), 45 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/mongodb.md
b/docs/content.zh/docs/connectors/table/mongodb.md
index dde8363..1c078f0 100644
--- a/docs/content.zh/docs/connectors/table/mongodb.md
+++ b/docs/content.zh/docs/connectors/table/mongodb.md
@@ -231,6 +231,19 @@ ON myTopic.key = MyUserTable._id;
<td>Duration</td>
<td>查询数据库失败的最大重试时间。</td>
</tr>
+ <tr>
+ <td><h5>filter.handling.policy</h5></td>
+ <td>可选</td>
+ <td>否</td>
+ <td style="word-wrap: break-word;">always</td>
+ <td>枚举值,可选项: always, never</td>
+ <td>过滤器下推策略,支持的策略有:
+ <ul>
+ <li><code>always</code>: 始终将支持的过滤器下推到数据库.</li>
+ <li><code>never</code>: 不将任何过滤器下推到数据库.</li>
+ </ul>
+ </td>
+ </tr>
<tr>
<td><h5>sink.buffer-flush.max-rows</h5></td>
<td>可选</td>
diff --git a/docs/content/docs/connectors/table/mongodb.md
b/docs/content/docs/connectors/table/mongodb.md
index 340a925..8fcd35c 100644
--- a/docs/content/docs/connectors/table/mongodb.md
+++ b/docs/content/docs/connectors/table/mongodb.md
@@ -236,6 +236,20 @@ Connector Options
<td>Duration</td>
<td>Specifies the retry time interval if lookup records from database
failed.</td>
</tr>
+ <tr>
+ <td><h5>filter.handling.policy</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">always</td>
+ <td>Enum Possible values: always, never</td>
+ <td>Fine-grained configuration to control filter push down.
+ Supported policies are:
+ <ul>
+ <li><code>always</code>: Always push the supported filters to
MongoDB.</li>
+ <li><code>never</code>: Never push any filters to MongoDB.</li>
+ </ul>
+ </td>
+ </tr>
<tr>
<td><h5>sink.buffer-flush.max-rows</h5></td>
<td>optional</td>
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/FilterHandlingPolicy.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/FilterHandlingPolicy.java
new file mode 100644
index 0000000..7e12e02
--- /dev/null
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/FilterHandlingPolicy.java
@@ -0,0 +1,33 @@
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/** Fine-grained configuration to control filter push down for MongoDB
Table/SQL source. */
+@PublicEvolving
+public enum FilterHandlingPolicy implements DescribedEnum {
+ ALWAYS("always", text("Always push the supported filters to MongoDB.")),
+
+ NEVER("never", text("Never push any filters to MongoDB."));
+
+ private final String name;
+ private final InlineElement description;
+
+ FilterHandlingPolicy(String name, InlineElement description) {
+ this.name = name;
+ this.description = description;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return description;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
index a91c489..a579eaa 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
@@ -139,4 +139,11 @@ public class MongoConnectorOptions {
.defaultValue(Duration.ofMillis(1000L))
.withDescription(
"Specifies the retry time interval if writing
records to database failed.");
+
+ public static final ConfigOption<FilterHandlingPolicy>
FILTER_HANDLING_POLICY =
+ ConfigOptions.key("filter.handling.policy")
+ .enumType(FilterHandlingPolicy.class)
+ .defaultValue(FilterHandlingPolicy.ALWAYS)
+ .withDescription(
+ "Fine-grained configuration to control filter push
down for MongoDB Table/SQL source.");
}
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java
index 59b8ba5..055c6b7 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java
@@ -43,6 +43,7 @@ import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUF
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE;
+import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.FILTER_HANDLING_POLICY;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_FETCH_SIZE;
@@ -99,6 +100,7 @@ public class MongoDynamicTableFactory
optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
optionalOptions.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
optionalOptions.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
+ optionalOptions.add(FILTER_HANDLING_POLICY);
return optionalOptions;
}
@@ -132,6 +134,7 @@ public class MongoDynamicTableFactory
getLookupCache(options),
config.getLookupMaxRetries(),
config.getLookupRetryIntervalMs(),
+ config.getFilterHandlingPolicy(),
context.getPhysicalRowDataType());
}
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
index a20f99a..229fb17 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
@@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -74,6 +75,7 @@ public class MongoDynamicTableSource
@Nullable private final LookupCache lookupCache;
private final int lookupMaxRetries;
private final long lookupRetryIntervalMs;
+ private final FilterHandlingPolicy filterHandlingPolicy;
private DataType producedDataType;
private int limit = -1;
@@ -85,6 +87,7 @@ public class MongoDynamicTableSource
@Nullable LookupCache lookupCache,
int lookupMaxRetries,
long lookupRetryIntervalMs,
+ FilterHandlingPolicy filterHandlingPolicy,
DataType producedDataType) {
this.connectionOptions = connectionOptions;
this.readOptions = readOptions;
@@ -99,6 +102,7 @@ public class MongoDynamicTableSource
String.format("The '%s' must be larger than 0.",
LOOKUP_RETRY_INTERVAL.key()));
this.lookupMaxRetries = lookupMaxRetries;
this.lookupRetryIntervalMs = lookupRetryIntervalMs;
+ this.filterHandlingPolicy = filterHandlingPolicy;
this.producedDataType = producedDataType;
}
@@ -170,6 +174,7 @@ public class MongoDynamicTableSource
lookupCache,
lookupMaxRetries,
lookupRetryIntervalMs,
+ filterHandlingPolicy,
producedDataType);
newSource.filter = BsonDocument.parse(filter.toJson());
return newSource;
@@ -198,28 +203,36 @@ public class MongoDynamicTableSource
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
- List<ResolvedExpression> acceptedFilters = new ArrayList<>();
- List<ResolvedExpression> remainingFilters = new ArrayList<>();
+ switch (filterHandlingPolicy) {
+ case NEVER:
+ return Result.of(Collections.emptyList(), filters);
+ case ALWAYS:
+ default:
+ List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+ List<ResolvedExpression> remainingFilters = new ArrayList<>();
- List<Bson> mongoFilters = new ArrayList<>();
- for (ResolvedExpression filter : filters) {
- BsonDocument simpleFilter = parseFilter(filter);
- if (simpleFilter.isEmpty()) {
- remainingFilters.add(filter);
- } else {
- acceptedFilters.add(filter);
- mongoFilters.add(simpleFilter);
- }
- }
+ List<Bson> mongoFilters = new ArrayList<>();
+ for (ResolvedExpression filter : filters) {
+ BsonDocument simpleFilter = parseFilter(filter);
+ if (simpleFilter.isEmpty()) {
+ remainingFilters.add(filter);
+ } else {
+ acceptedFilters.add(filter);
+ mongoFilters.add(simpleFilter);
+ }
+ }
- if (!mongoFilters.isEmpty()) {
- Bson mergedFilter =
- mongoFilters.size() == 1 ? mongoFilters.get(0) :
Filters.and(mongoFilters);
- this.filter = mergedFilter.toBsonDocument();
- LOG.info("Pushed down filters: {}", filter.toJson());
- }
+ if (!mongoFilters.isEmpty()) {
+ Bson mergedFilter =
+ mongoFilters.size() == 1
+ ? mongoFilters.get(0)
+ : Filters.and(mongoFilters);
+ this.filter = mergedFilter.toBsonDocument();
+ LOG.info("Pushed down filters: {}", filter.toJson());
+ }
- return Result.of(acceptedFilters, remainingFilters);
+ return Result.of(acceptedFilters, remainingFilters);
+ }
}
static BsonDocument parseFilter(ResolvedExpression filter) {
@@ -244,7 +257,8 @@ public class MongoDynamicTableSource
&& Objects.equals(filter, that.filter)
&& Objects.equals(lookupCache, that.lookupCache)
&& Objects.equals(lookupMaxRetries, that.lookupMaxRetries)
- && Objects.equals(lookupRetryIntervalMs,
that.lookupRetryIntervalMs);
+ && Objects.equals(lookupRetryIntervalMs,
that.lookupRetryIntervalMs)
+ && Objects.equals(filterHandlingPolicy,
that.filterHandlingPolicy);
}
@Override
@@ -257,6 +271,7 @@ public class MongoDynamicTableSource
filter,
lookupCache,
lookupMaxRetries,
- lookupRetryIntervalMs);
+ lookupRetryIntervalMs,
+ filterHandlingPolicy);
}
}
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/config/MongoConfiguration.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/config/MongoConfiguration.java
index c141ca2..753b429 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/config/MongoConfiguration.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/config/MongoConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;
import
org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
+import org.apache.flink.connector.mongodb.table.FilterHandlingPolicy;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import javax.annotation.Nullable;
@@ -33,6 +34,7 @@ import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUF
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE;
+import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.FILTER_HANDLING_POLICY;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_FETCH_SIZE;
@@ -97,6 +99,10 @@ public class MongoConfiguration {
return config.get(LOOKUP_RETRY_INTERVAL).toMillis();
}
+ public FilterHandlingPolicy getFilterHandlingPolicy() {
+ return config.get(FILTER_HANDLING_POLICY);
+ }
+
// -----------------------------------Write
Config------------------------------------------
public int getBufferFlushMaxRows() {
return config.get(BUFFER_FLUSH_MAX_ROWS);
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactoryTest.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactoryTest.java
index c9af0fc..3dae596 100644
---
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactoryTest.java
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactoryTest.java
@@ -46,6 +46,7 @@ import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUF
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE;
+import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.FILTER_HANDLING_POLICY;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_FETCH_SIZE;
@@ -89,6 +90,7 @@ public class MongoDynamicTableFactoryTest {
null,
LookupOptions.MAX_RETRIES.defaultValue(),
LOOKUP_RETRY_INTERVAL.defaultValue().toMillis(),
+ FILTER_HANDLING_POLICY.defaultValue(),
SCHEMA.toPhysicalRowDataType());
assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -115,6 +117,7 @@ public class MongoDynamicTableFactoryTest {
properties.put(SCAN_PARTITION_STRATEGY.key(), "split-vector");
properties.put(SCAN_PARTITION_SIZE.key(), "128m");
properties.put(SCAN_PARTITION_SAMPLES.key(), "5");
+ properties.put(FILTER_HANDLING_POLICY.key(), "never");
DynamicTableSource actual = createTableSource(SCHEMA, properties);
@@ -135,6 +138,7 @@ public class MongoDynamicTableFactoryTest {
null,
LookupOptions.MAX_RETRIES.defaultValue(),
LOOKUP_RETRY_INTERVAL.defaultValue().toMillis(),
+ FilterHandlingPolicy.NEVER,
SCHEMA.toPhysicalRowDataType());
assertThat(actual).isEqualTo(expected);
@@ -162,6 +166,7 @@ public class MongoDynamicTableFactoryTest {
DefaultLookupCache.fromConfig(Configuration.fromMap(properties)),
10,
20,
+ FILTER_HANDLING_POLICY.defaultValue(),
SCHEMA.toPhysicalRowDataType());
assertThat(actual).isEqualTo(expected);
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
index 74e2de1..062b374 100644
---
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
@@ -81,6 +81,7 @@ import java.util.stream.Collectors;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
+import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.FILTER_HANDLING_POLICY;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.URI;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.assertj.core.api.Assertions.assertThat;
@@ -241,9 +242,13 @@ class MongoDynamicTableSourceITCase {
}
}
- @Test
- void testFilter() {
- tEnv.executeSql(createTestDDl(null));
+ @ParameterizedTest
+ @EnumSource(FilterHandlingPolicy.class)
+ void testFilter(FilterHandlingPolicy filterHandlingPolicy) {
+ tEnv.executeSql(
+ createTestDDl(
+ Collections.singletonMap(
+ FILTER_HANDLING_POLICY.key(),
filterHandlingPolicy.name())));
// we create a VIEW here to test column remapping, i.e. would filter
push down work if we
// create a view that depends on our source table
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
index 133cbe7..4636d60 100644
---
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
@@ -17,7 +17,10 @@
package org.apache.flink.connector.mongodb.table;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
@@ -28,55 +31,81 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.rules.TestName;
import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Map;
/** Plan tests for Mongo connector, for example, testing projection push down.
*/
-public class MongoTablePlanTest extends TableTestBase {
+class MongoTablePlanTest extends TableTestBase {
private final StreamTableTestUtil util =
streamTestUtil(TableConfig.getDefault());
private TestInfo testInfo;
@BeforeEach
- public void setup(TestInfo testInfo) {
+ void setup(TestInfo testInfo) {
this.testInfo = testInfo;
TableEnvironment tEnv = util.tableEnv();
tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
- tEnv.executeSql(
- "CREATE TABLE mongo ("
- + "id BIGINT,"
- + "description VARCHAR(200),"
- + "boolean_col BOOLEAN,"
- + "timestamp_col TIMESTAMP_LTZ(0),"
- + "timestamp3_col TIMESTAMP_LTZ(3),"
- + "int_col INTEGER,"
- + "double_col DOUBLE,"
- + "decimal_col DECIMAL(10, 4)"
- + ") WITH ("
- + " 'connector'='mongodb',"
- + " 'uri'='mongodb://127.0.0.1:27017',"
- + " 'database'='test_db',"
- + " 'collection'='test_coll'"
- + ")");
}
@Test
- public void testFilterPushdown() {
+ void testFilterPushdown() {
+ createTestTable();
util.verifyExecPlan(
"SELECT id, timestamp3_col, int_col FROM mongo WHERE id =
900001 AND timestamp3_col <> TIMESTAMP '2022-09-07 10:25:28.127' OR double_col
>= -1000.23");
}
@Test
- public void testFilterPartialPushdown() {
+ void testFilterPartialPushdown() {
+ createTestTable();
util.verifyExecPlan(
"SELECT id, timestamp3_col, int_col FROM mongo WHERE id =
900001 AND boolean_col = (decimal_col > 2.0)");
}
@Test
- public void testFilterCannotPushdown() {
+ void testFilterCannotPushdown() {
+ createTestTable();
util.verifyExecPlan(
"SELECT id, timestamp3_col, int_col FROM mongo WHERE id IS NOT
NULL OR double_col = decimal_col");
}
+ @Test
+ void testNeverFilterPushdown() {
+ createTestTable(
+ Collections.singletonMap(
+ MongoConnectorOptions.FILTER_HANDLING_POLICY.key(),
+ FilterHandlingPolicy.NEVER.name()));
+ util.verifyExecPlan(
+ "SELECT id, timestamp3_col, int_col FROM mongo WHERE id =
900001 AND decimal_col > 1.0");
+ }
+
+ private void createTestTable() {
+ createTestTable(Collections.emptyMap());
+ }
+
+ private void createTestTable(Map<String, String> extraOptions) {
+ TableDescriptor.Builder builder =
+ TableDescriptor.forConnector("mongodb")
+ .option("uri", "mongodb://127.0.0.1:27017")
+ .option("database", "test_db")
+ .option("collection", "test_coll")
+ .schema(
+ Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("description",
DataTypes.VARCHAR(200))
+ .column("boolean_col",
DataTypes.BOOLEAN())
+ .column("timestamp_col",
DataTypes.TIMESTAMP_LTZ(0))
+ .column("timestamp3_col",
DataTypes.TIMESTAMP_LTZ(3))
+ .column("int_col", DataTypes.INT())
+ .column("double_col",
DataTypes.DOUBLE())
+ .column("decimal_col",
DataTypes.DECIMAL(10, 4))
+ .build());
+
+ extraOptions.forEach(builder::option);
+
+ util.tableEnv().createTable("mongo", builder.build());
+ }
+
// A workaround to get the test method name for flink versions not
completely migrated to JUnit5
public TestName name() {
return new TestName() {
diff --git
a/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
b/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
index b2c57e4..b40cb1f 100644
---
a/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
+++
b/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
@@ -66,6 +66,24 @@ LogicalProject(id=[$0], timestamp3_col=[$4], int_col=[$5])
<![CDATA[
Calc(select=[id, timestamp3_col, int_col], where=[(id IS NOT NULL OR
(double_col = CAST(decimal_col AS DOUBLE)))])
+- TableSourceScan(table=[[default_catalog, default_database, mongo,
filter=[], project=[id, timestamp3_col, int_col, double_col, decimal_col]]],
fields=[id, timestamp3_col, int_col, double_col, decimal_col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNeverFilterPushdown">
+ <Resource name="sql">
+ <![CDATA[SELECT id, timestamp3_col, int_col FROM mongo WHERE id = 900001
AND decimal_col > 1.0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], timestamp3_col=[$4], int_col=[$5])
++- LogicalFilter(condition=[AND(=($0, 900001), >($7, 1.0:DECIMAL(2, 1)))])
+ +- LogicalTableScan(table=[[default_catalog, default_database, mongo]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[CAST(900001 AS BIGINT) AS id, timestamp3_col, int_col],
where=[((id = 900001) AND (decimal_col > 1.0))])
++- TableSourceScan(table=[[default_catalog, default_database, mongo,
filter=[], project=[id, timestamp3_col, int_col, decimal_col]]], fields=[id,
timestamp3_col, int_col, decimal_col])
]]>
</Resource>
</TestCase>