This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git
The following commit(s) were added to refs/heads/main by this push:
new fbabbd6 [FLINK-33257][connectors/mongodb] Support filter pushdown in
MongoDB connector
fbabbd6 is described below
commit fbabbd6b2d535c1c3fe37636e476f1a52990fc48
Author: Jiabao Sun <[email protected]>
AuthorDate: Fri Jan 26 00:10:17 2024 +0800
[FLINK-33257][connectors/mongodb] Support filter pushdown in MongoDB
connector
This closes #17.
---
docs/content.zh/docs/connectors/table/mongodb.md | 56 ++++
docs/content/docs/connectors/table/mongodb.md | 56 ++++
flink-connector-mongodb/pom.xml | 10 +-
.../connector/mongodb/source/MongoSource.java | 7 +
.../mongodb/source/MongoSourceBuilder.java | 17 ++
.../reader/split/MongoScanSourceSplitReader.java | 7 +-
.../mongodb/table/MongoDynamicTableSource.java | 69 ++++-
.../mongodb/table/MongoFilterPushDownVisitor.java | 249 ++++++++++++++++
.../mongodb/source/MongoSourceITCase.java | 11 +-
.../table/MongoDynamicTableSourceITCase.java | 291 +++++++++++++------
.../table/MongoFilterPushDownVisitorTest.java | 319 +++++++++++++++++++++
.../mongodb/table/MongoTablePlanTest.java | 89 ++++++
.../connector/mongodb/table/MongoTablePlanTest.xml | 72 +++++
pom.xml | 10 +-
14 files changed, 1159 insertions(+), 104 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/mongodb.md
b/docs/content.zh/docs/connectors/table/mongodb.md
index fd0ba4e..c111481 100644
--- a/docs/content.zh/docs/connectors/table/mongodb.md
+++ b/docs/content.zh/docs/connectors/table/mongodb.md
@@ -340,6 +340,62 @@ lookup cache 的主要目的是用于提高时态表关联 MongoDB 连接器的
如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,这可能导致在恢复过程中重复处理消息。
强烈推荐使用 upsert 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。
+### 过滤器下推
+
+MongoDB 支持将 Flink SQL 的简单比较和逻辑过滤器下推以优化查询。
+Flink SQL 过滤器到 MongoDB 查询操作符的映射如下表所示。
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left">Flink SQL filters</th>
+ <th class="text-left"><a
href="https://www.mongodb.com/docs/manual/reference/operator/query/">MongoDB
Query Operators</a></th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><code>=</code></td>
+ <td><code>$eq</code></td>
+ </tr>
+ <tr>
+ <td><code><></code></td>
+ <td><code>$ne</code></td>
+ </tr>
+ <tr>
+ <td><code>></code></td>
+ <td><code>$gt</code></td>
+ </tr>
+ <tr>
+ <td><code>>=</code></td>
+ <td><code>$gte</code></td>
+ </tr>
+ <tr>
+ <td><code><</code></td>
+ <td><code>$lt</code></td>
+ </tr>
+ <tr>
+ <td><code><=</code></td>
+ <td><code>$lte</code></td>
+ </tr>
+ <tr>
+ <td><code>IS NULL</code></td>
+ <td><code>$eq : null</code></td>
+ </tr>
+ <tr>
+ <td><code>IS NOT NULL</code></td>
+ <td><code>$ne : null</code></td>
+ </tr>
+ <tr>
+ <td><code>OR</code></td>
+ <td><code>$or</code></td>
+ </tr>
+ <tr>
+ <td><code>AND</code></td>
+ <td><code>$and</code></td>
+ </tr>
+ </tbody>
+</table>
+
数据类型映射
----------------
MongoDB BSON 类型到 Flink SQL 数据类型的映射如下表所示。
diff --git a/docs/content/docs/connectors/table/mongodb.md
b/docs/content/docs/connectors/table/mongodb.md
index 9a5dfd6..7ce3bdf 100644
--- a/docs/content/docs/connectors/table/mongodb.md
+++ b/docs/content/docs/connectors/table/mongodb.md
@@ -371,6 +371,62 @@ If there are failures, the Flink job will recover and
re-process from last succe
which can lead to re-processing messages during recovery. The upsert mode is
highly recommended as
it helps avoid constraint violations or duplicate data if records need to be
re-processed.
+### Filters Pushdown
+
+MongoDB supports pushing down simple comparisons and logical filters to
optimize queries.
+The mappings from Flink SQL filters to MongoDB query operators are listed in
the following table.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left">Flink SQL filters</th>
+ <th class="text-left"><a
href="https://www.mongodb.com/docs/manual/reference/operator/query/">MongoDB
Query Operators</a></th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><code>=</code></td>
+ <td><code>$eq</code></td>
+ </tr>
+ <tr>
+ <td><code><></code></td>
+ <td><code>$ne</code></td>
+ </tr>
+ <tr>
+ <td><code>></code></td>
+ <td><code>$gt</code></td>
+ </tr>
+ <tr>
+ <td><code>>=</code></td>
+ <td><code>$gte</code></td>
+ </tr>
+ <tr>
+ <td><code><</code></td>
+ <td><code>$lt</code></td>
+ </tr>
+ <tr>
+ <td><code><=</code></td>
+ <td><code>$lte</code></td>
+ </tr>
+ <tr>
+ <td><code>IS NULL</code></td>
+ <td><code>$eq : null</code></td>
+ </tr>
+ <tr>
+ <td><code>IS NOT NULL</code></td>
+ <td><code>$ne : null</code></td>
+ </tr>
+ <tr>
+ <td><code>OR</code></td>
+ <td><code>$or</code></td>
+ </tr>
+ <tr>
+ <td><code>AND</code></td>
+ <td><code>$and</code></td>
+ </tr>
+ </tbody>
+</table>
+
Data Type Mapping
----------------
The field data type mappings from MongoDB BSON types to Flink SQL data types
are listed in the following table.
diff --git a/flink-connector-mongodb/pom.xml b/flink-connector-mongodb/pom.xml
index 7eb4182..e8959a8 100644
--- a/flink-connector-mongodb/pom.xml
+++ b/flink-connector-mongodb/pom.xml
@@ -115,11 +115,19 @@ under the License.
<!-- Table API integration tests -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-loader</artifactId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
index 6b1b89c..bbec562 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
@@ -46,6 +46,7 @@ import
org.apache.flink.connector.mongodb.source.split.MongoSourceSplitSerialize
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
import javax.annotation.Nullable;
@@ -87,6 +88,9 @@ public class MongoSource<OUT>
/** The projections for MongoDB source. */
@Nullable private final List<String> projectedFields;
+ /** The filter for MongoDB source. */
+ private final BsonDocument filter;
+
/** The limit for MongoDB source. */
private final int limit;
@@ -100,11 +104,13 @@ public class MongoSource<OUT>
MongoConnectionOptions connectionOptions,
MongoReadOptions readOptions,
@Nullable List<String> projectedFields,
+ Bson filter,
int limit,
MongoDeserializationSchema<OUT> deserializationSchema) {
this.connectionOptions = checkNotNull(connectionOptions);
this.readOptions = checkNotNull(readOptions);
this.projectedFields = projectedFields;
+ this.filter = filter.toBsonDocument();
this.limit = limit;
// Only support bounded mode for now.
// We can implement unbounded mode by ChangeStream future.
@@ -140,6 +146,7 @@ public class MongoSource<OUT>
connectionOptions,
readOptions,
projectedFields,
+ filter,
mongoReaderContext);
return new MongoSourceReader<>(
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSourceBuilder.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSourceBuilder.java
index c5252eb..6aa0af4 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSourceBuilder.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSourceBuilder.java
@@ -25,7 +25,9 @@ import
org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionSt
import
org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
import
org.apache.flink.connector.mongodb.source.reader.split.MongoScanSourceSplitReader;
+import com.mongodb.client.model.Filters;
import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
import java.util.Arrays;
import java.util.List;
@@ -47,6 +49,9 @@ public class MongoSourceBuilder<OUT> {
private final MongoReadOptions.MongoReadOptionsBuilder readOptionsBuilder;
private List<String> projectedFields;
+
+ private Bson filter = Filters.empty();
+
private int limit = -1;
private MongoDeserializationSchema<OUT> deserializationSchema;
@@ -168,6 +173,17 @@ public class MongoSourceBuilder<OUT> {
return this;
}
+ /**
+ * Sets the filter of documents to read.
+ *
+ * @param filter the filter of documents to read.
+ * @return this builder
+ */
+ public MongoSourceBuilder<OUT> setFilter(Bson filter) {
+ this.filter = checkNotNull(filter, "The filter must not be null");
+ return this;
+ }
+
/**
* Sets the projection fields of documents to read.
*
@@ -216,6 +232,7 @@ public class MongoSourceBuilder<OUT> {
connectionOptionsBuilder.build(),
readOptionsBuilder.build(),
projectedFields,
+ filter,
limit,
deserializationSchema);
}
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
index e521c91..26c584e 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
@@ -36,6 +36,7 @@ import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCursor;
import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +58,8 @@ public class MongoScanSourceSplitReader implements
MongoSourceSplitReader<MongoS
private final MongoSourceReaderContext readerContext;
@Nullable private final List<String> projectedFields;
+ private final Bson filter;
+
private boolean closed = false;
private boolean finished = false;
private MongoClient mongoClient;
@@ -67,10 +70,12 @@ public class MongoScanSourceSplitReader implements
MongoSourceSplitReader<MongoS
MongoConnectionOptions connectionOptions,
MongoReadOptions readOptions,
@Nullable List<String> projectedFields,
+ Bson filter,
MongoSourceReaderContext readerContext) {
this.connectionOptions = connectionOptions;
this.readOptions = readOptions;
this.projectedFields = projectedFields;
+ this.filter = filter;
this.readerContext = readerContext;
}
@@ -174,7 +179,7 @@ public class MongoScanSourceSplitReader implements
MongoSourceSplitReader<MongoS
mongoClient
.getDatabase(connectionOptions.getDatabase())
.getCollection(connectionOptions.getCollection(),
BsonDocument.class)
- .find()
+ .find(filter)
.min(currentSplit.getMin())
.max(currentSplit.getMax())
.hint(currentSplit.getHint())
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
index d3e2a53..a20f99a 100644
---
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
@@ -29,6 +29,7 @@ import
org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
+import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
@@ -36,10 +37,18 @@ import
org.apache.flink.table.connector.source.lookup.LookupOptions;
import
org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import com.mongodb.client.model.Filters;
+import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -55,8 +64,11 @@ public class MongoDynamicTableSource
implements ScanTableSource,
LookupTableSource,
SupportsProjectionPushDown,
+ SupportsFilterPushDown,
SupportsLimitPushDown {
+ private static final Logger LOG =
LoggerFactory.getLogger(MongoDynamicTableSource.class);
+
private final MongoConnectionOptions connectionOptions;
private final MongoReadOptions readOptions;
@Nullable private final LookupCache lookupCache;
@@ -65,6 +77,8 @@ public class MongoDynamicTableSource
private DataType producedDataType;
private int limit = -1;
+ private BsonDocument filter = Filters.empty().toBsonDocument();
+
public MongoDynamicTableSource(
MongoConnectionOptions connectionOptions,
MongoReadOptions readOptions,
@@ -135,6 +149,7 @@ public class MongoDynamicTableSource
.setSamplesPerPartition(readOptions.getSamplesPerPartition())
.setLimit(limit)
.setProjectedFields(DataType.getFieldNames(producedDataType))
+ .setFilter(filter)
.setDeserializationSchema(deserializationSchema)
.build();
@@ -148,13 +163,16 @@ public class MongoDynamicTableSource
@Override
public DynamicTableSource copy() {
- return new MongoDynamicTableSource(
- connectionOptions,
- readOptions,
- lookupCache,
- lookupMaxRetries,
- lookupRetryIntervalMs,
- producedDataType);
+ MongoDynamicTableSource newSource =
+ new MongoDynamicTableSource(
+ connectionOptions,
+ readOptions,
+ lookupCache,
+ lookupMaxRetries,
+ lookupRetryIntervalMs,
+ producedDataType);
+ newSource.filter = BsonDocument.parse(filter.toJson());
+ return newSource;
}
@Override
@@ -178,6 +196,41 @@ public class MongoDynamicTableSource
this.producedDataType = producedDataType;
}
+ @Override
+ public Result applyFilters(List<ResolvedExpression> filters) {
+ List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+ List<ResolvedExpression> remainingFilters = new ArrayList<>();
+
+ List<Bson> mongoFilters = new ArrayList<>();
+ for (ResolvedExpression filter : filters) {
+ BsonDocument simpleFilter = parseFilter(filter);
+ if (simpleFilter.isEmpty()) {
+ remainingFilters.add(filter);
+ } else {
+ acceptedFilters.add(filter);
+ mongoFilters.add(simpleFilter);
+ }
+ }
+
+ if (!mongoFilters.isEmpty()) {
+ Bson mergedFilter =
+ mongoFilters.size() == 1 ? mongoFilters.get(0) :
Filters.and(mongoFilters);
+ this.filter = mergedFilter.toBsonDocument();
+ LOG.info("Pushed down filters: {}", filter.toJson());
+ }
+
+ return Result.of(acceptedFilters, remainingFilters);
+ }
+
+ static BsonDocument parseFilter(ResolvedExpression filter) {
+ if (filter instanceof CallExpression) {
+ CallExpression callExp = (CallExpression) filter;
+ return MongoFilterPushDownVisitor.INSTANCE.visit(callExp);
+ } else {
+ return Filters.empty().toBsonDocument();
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof MongoDynamicTableSource)) {
@@ -188,6 +241,7 @@ public class MongoDynamicTableSource
&& Objects.equals(readOptions, that.readOptions)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(limit, that.limit)
+ && Objects.equals(filter, that.filter)
&& Objects.equals(lookupCache, that.lookupCache)
&& Objects.equals(lookupMaxRetries, that.lookupMaxRetries)
&& Objects.equals(lookupRetryIntervalMs,
that.lookupRetryIntervalMs);
@@ -200,6 +254,7 @@ public class MongoDynamicTableSource
readOptions,
producedDataType,
limit,
+ filter,
lookupCache,
lookupMaxRetries,
lookupRetryIntervalMs);
diff --git
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitor.java
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitor.java
new file mode 100644
index 0000000..a8735b8
--- /dev/null
+++
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitor.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import com.mongodb.client.model.Filters;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.BsonType;
+import org.bson.BsonUndefined;
+import org.bson.BsonValue;
+import org.bson.conversions.Bson;
+import org.bson.types.Decimal128;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Visitor that convert Expression to Bson filter. Return {@link
Filters#empty()} if we cannot push
+ * down the filter.
+ */
+@Experimental
+public class MongoFilterPushDownVisitor extends
ExpressionDefaultVisitor<BsonValue> {
+
+ public static final MongoFilterPushDownVisitor INSTANCE = new
MongoFilterPushDownVisitor();
+
+ private MongoFilterPushDownVisitor() {}
+
+ @Override
+ public BsonDocument visit(CallExpression call) {
+ Bson filter = Filters.empty();
+ if
(BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+ filter = renderBinaryComparisonOperator("$eq",
call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+ filter = renderBinaryComparisonOperator("$lt",
call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
{
+ filter = renderBinaryComparisonOperator("$lte",
call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+ filter = renderBinaryComparisonOperator("$gt",
call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
{
+ filter = renderBinaryComparisonOperator("$gte",
call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+ filter = renderBinaryComparisonOperator("$ne",
call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.IS_NULL.equals(call.getFunctionDefinition())) {
+ filter =
+ renderUnaryComparisonOperator(
+ "$eq", call.getResolvedChildren().get(0),
BsonNull.VALUE);
+ }
+ if
(BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) {
+ filter =
+ renderUnaryComparisonOperator(
+ "$ne", call.getResolvedChildren().get(0),
BsonNull.VALUE);
+ }
+ if
(BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+ filter = renderLogicalOperator("$or", call.getResolvedChildren());
+ }
+ if
(BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+ filter = renderLogicalOperator("$and", call.getResolvedChildren());
+ }
+ return filter.toBsonDocument();
+ }
+
+ private Bson renderBinaryComparisonOperator(
+ String operator, List<ResolvedExpression> expressions) {
+ Optional<FieldReferenceExpression> fieldReferenceExpr =
+ extractExpression(expressions, FieldReferenceExpression.class);
+ Optional<ValueLiteralExpression> fieldValueExpr =
+ extractExpression(expressions, ValueLiteralExpression.class);
+
+ // Nested complex expressions are not supported. e.g. f1 = (f2 > 2)
+ if (!fieldReferenceExpr.isPresent() || !fieldValueExpr.isPresent()) {
+ return Filters.empty();
+ }
+
+ String fieldName = visit(fieldReferenceExpr.get()).getValue();
+ BsonValue fieldValue = visit(fieldValueExpr.get());
+
+ // Unsupported values
+ if (fieldValue.getBsonType() == BsonType.UNDEFINED) {
+ return Filters.empty();
+ }
+
+ switch (operator) {
+ case "$eq":
+ return Filters.eq(fieldName, fieldValue);
+ case "$lt":
+ return Filters.lt(fieldName, fieldValue);
+ case "$lte":
+ return Filters.lte(fieldName, fieldValue);
+ case "$gt":
+ return Filters.gt(fieldName, fieldValue);
+ case "$gte":
+ return Filters.gte(fieldName, fieldValue);
+ case "$ne":
+ return Filters.ne(fieldName, fieldValue);
+ default:
+ return Filters.empty();
+ }
+ }
+
+ private Bson renderUnaryComparisonOperator(
+ String operator, ResolvedExpression operand, BsonValue value) {
+ if (operand instanceof FieldReferenceExpression) {
+ String fieldName = visit((FieldReferenceExpression)
operand).getValue();
+ switch (operator) {
+ case "$eq":
+ return Filters.eq(fieldName, value);
+ case "$ne":
+ return Filters.ne(fieldName, value);
+ default:
+ return Filters.empty();
+ }
+ } else {
+ return Filters.empty();
+ }
+ }
+
+ private Bson renderLogicalOperator(String operator,
List<ResolvedExpression> operands) {
+ Bson[] filters = new Bson[operands.size()];
+ for (int i = 0; i < operands.size(); i++) {
+ ResolvedExpression operand = operands.get(i);
+ BsonValue filter = operand.accept(this);
+
+ // sub-filters that cannot be pushed down
+ if (!filter.isDocument() || filter.asDocument().isEmpty()) {
+ return Filters.empty();
+ }
+ filters[i] = filter.asDocument();
+ }
+
+ switch (operator) {
+ case "$or":
+ return Filters.or(filters);
+ case "$and":
+ return Filters.and(filters);
+ default:
+ return Filters.empty();
+ }
+ }
+
+ @Override
+ public BsonValue visit(ValueLiteralExpression litExp) {
+ LogicalType type = litExp.getOutputDataType().getLogicalType();
+ Optional<BsonValue> value;
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ value = litExp.getValueAs(String.class).map(BsonString::new);
+ break;
+ case BOOLEAN:
+ value = litExp.getValueAs(Boolean.class).map(BsonBoolean::new);
+ break;
+ case DECIMAL:
+ value =
+ litExp.getValueAs(BigDecimal.class)
+ .map(Decimal128::new)
+ .map(BsonDecimal128::new);
+ break;
+ case INTEGER:
+ value = litExp.getValueAs(Integer.class).map(BsonInt32::new);
+ break;
+ case BIGINT:
+ value = litExp.getValueAs(Long.class).map(BsonInt64::new);
+ break;
+ case DOUBLE:
+ value = litExp.getValueAs(Double.class).map(BsonDouble::new);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ value =
+ litExp.getValueAs(LocalDateTime.class)
+ .map(Timestamp::valueOf)
+ .map(Timestamp::getTime)
+ .map(BsonDateTime::new);
+ break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ value =
+ litExp.getValueAs(Instant.class)
+ .map(Instant::toEpochMilli)
+ .map(BsonDateTime::new);
+ break;
+ default:
+ // Use BsonUndefined to represent unsupported values.
+ value = Optional.of(new BsonUndefined());
+ break;
+ }
+ return value.orElse(BsonNull.VALUE);
+ }
+
+ @Override
+ public BsonString visit(FieldReferenceExpression fieldReference) {
+ return new BsonString(fieldReference.toString());
+ }
+
+ @Override
+ protected BsonDocument defaultMethod(Expression expression) {
+ return Filters.empty().toBsonDocument();
+ }
+
+ private static <T> Optional<T> extractExpression(
+ List<ResolvedExpression> expressions, Class<T> type) {
+ for (ResolvedExpression expression : expressions) {
+ if (type.isAssignableFrom(expression.getClass())) {
+ return Optional.of(type.cast(expression));
+ }
+ }
+ return Optional.empty();
+ }
+}
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/MongoSourceITCase.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/MongoSourceITCase.java
index 0578c57..57a49ff 100644
---
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/MongoSourceITCase.java
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/MongoSourceITCase.java
@@ -76,7 +76,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/** IT cases for using Mongo Source. */
@Testcontainers
-public class MongoSourceITCase {
+class MongoSourceITCase {
private static final int PARALLELISM = 2;
@@ -175,7 +175,7 @@ public class MongoSourceITCase {
@ParameterizedTest
@MethodSource("providePartitionStrategyAndCollection")
- public void testPartitionStrategy(PartitionStrategy partitionStrategy,
String collection)
+ void testPartitionStrategy(PartitionStrategy partitionStrategy, String
collection)
throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -184,6 +184,7 @@ public class MongoSourceITCase {
.setPartitionSize(MemorySize.parse("1mb"))
.setSamplesPerPartition(3)
.setPartitionStrategy(partitionStrategy)
+ .setFilter(Filters.gt("f0", new BsonInt32(10000)))
.build();
List<RowData> results =
@@ -194,11 +195,11 @@ public class MongoSourceITCase {
"MongoDB-Source")
.executeAndCollect());
- assertThat(results).hasSize(TEST_RECORD_SIZE);
+ assertThat(results).hasSize(TEST_RECORD_SIZE - 10000);
}
@Test
- public void testLimit() throws Exception {
+ void testLimit() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
final int limitSize = 100;
@@ -220,7 +221,7 @@ public class MongoSourceITCase {
}
@Test
- public void testProject() throws Exception {
+ void testProject() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
MongoSource<String> mongoSource =
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
index e55ef6f..74e2de1 100644
---
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
@@ -51,6 +51,7 @@ import org.bson.BsonDocument;
import org.bson.BsonDouble;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
+import org.bson.BsonNull;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.types.Decimal128;
@@ -68,17 +69,15 @@ import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
import static
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
@@ -88,7 +87,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/** ITCase for {@link MongoDynamicTableSource}. */
@Testcontainers
-public class MongoDynamicTableSourceITCase {
+class MongoDynamicTableSourceITCase {
private static final Logger LOG =
LoggerFactory.getLogger(MongoDynamicTableSinkITCase.class);
@@ -108,8 +107,8 @@ public class MongoDynamicTableSourceITCase {
private static MongoClient mongoClient;
- public static StreamExecutionEnvironment env;
- public static StreamTableEnvironment tEnv;
+ private static StreamExecutionEnvironment env;
+ private static StreamTableEnvironment tEnv;
@BeforeAll
static void beforeAll() {
@@ -121,8 +120,7 @@ public class MongoDynamicTableSourceITCase {
.getCollection(TEST_COLLECTION)
.withDocumentClass(BsonDocument.class);
- List<BsonDocument> testRecords = Arrays.asList(createTestData(1),
createTestData(2));
- coll.insertMany(testRecords);
+ coll.insertMany(createTestData());
}
@AfterAll
@@ -136,70 +134,42 @@ public class MongoDynamicTableSourceITCase {
void before() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
+ tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
}
@Test
- public void testSource() {
+ void testSource() {
tEnv.executeSql(createTestDDl(null));
- Iterator<Row> collected = tEnv.executeSql("SELECT * FROM
mongo_source").collect();
- List<String> result =
- CollectionUtil.iteratorToList(collected).stream()
- .map(Row::toString)
- .sorted()
- .collect(Collectors.toList());
+ List<Row> result = executeQueryToList("SELECT * FROM mongo_source");
- List<String> expected =
- Stream.of(
- "+I[1, 2, false, [3], 6,
2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.10, {k=12}, +I[13],
[11_1, 11_2], [+I[12_1], +I[12_2]]]",
- "+I[2, 2, false, [3], 6,
2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.10, {k=12}, +I[13],
[11_1, 11_2], [+I[12_1], +I[12_2]]]")
- .sorted()
- .collect(Collectors.toList());
-
- assertThat(result).isEqualTo(expected);
+ assertThat(result).isEqualTo(expectedRows());
}
@Test
- public void testProject() {
+ void testProject() {
tEnv.executeSql(createTestDDl(null));
- Iterator<Row> collected = tEnv.executeSql("SELECT f1, f10 FROM
mongo_source").collect();
- List<String> result =
- CollectionUtil.iteratorToList(collected).stream()
- .map(Row::toString)
- .sorted()
- .collect(Collectors.toList());
+ List<Row> result = executeQueryToList("SELECT f1, f10 FROM
mongo_source");
- List<String> expected =
- Stream.of("+I[2, +I[13]]", "+I[2,
+I[13]]").sorted().collect(Collectors.toList());
+ List<Row> expected = Arrays.asList(Row.of("2", Row.of(13)),
Row.of("3", Row.of(14)));
assertThat(result).isEqualTo(expected);
}
@Test
- public void testLimit() {
+ void testLimit() {
tEnv.executeSql(createTestDDl(null));
- Iterator<Row> collected = tEnv.executeSql("SELECT * FROM mongo_source
LIMIT 1").collect();
- List<String> result =
- CollectionUtil.iteratorToList(collected).stream()
- .map(Row::toString)
- .sorted()
- .collect(Collectors.toList());
-
- Set<String> expected = new HashSet<>();
- expected.add(
- "+I[1, 2, false, [3], 6, 2022-09-07T10:25:28.127Z,
2022-09-07T10:25:28Z, 0.9, 1.10, {k=12}, +I[13], [11_1, 11_2], [+I[12_1],
+I[12_2]]]");
- expected.add(
- "+I[2, 2, false, [3], 6, 2022-09-07T10:25:28.127Z,
2022-09-07T10:25:28Z, 0.9, 1.10, {k=12}, +I[13], [11_1, 11_2], [+I[12_1],
+I[12_2]]]");
+ List<Row> result = executeQueryToList("SELECT * FROM mongo_source
LIMIT 1");
assertThat(result).hasSize(1);
- assertThat(result).containsAnyElementsOf(expected);
+ assertThat(result).containsAnyElementsOf(expectedRows());
}
@ParameterizedTest
@EnumSource(Caching.class)
- public void testLookupJoin(Caching caching) throws Exception {
+ void testLookupJoin(Caching caching) throws Exception {
// Create MongoDB lookup table
Map<String, String> lookupOptions = new HashMap<>();
if (caching.equals(Caching.ENABLE_CACHE)) {
@@ -239,20 +209,16 @@ public class MongoDynamicTableSourceITCase {
// Execute lookup join
try (CloseableIterator<Row> iterator =
- tEnv.executeSql(
- "SELECT S.id, S.name, D._id, D.f1, D.f2 FROM
value_source"
- + " AS S JOIN mongo_source for
system_time as of S.proctime AS D ON S.id = D._id")
- .collect()) {
- List<String> result =
- CollectionUtil.iteratorToList(iterator).stream()
- .map(Row::toString)
- .sorted()
- .collect(Collectors.toList());
- List<String> expected =
+ executeQuery(
+ "SELECT S.id, S.name, D._id, D.f1, D.f2 FROM
value_source"
+ + " AS S JOIN mongo_source for system_time as
of S.proctime AS D ON S.id = D._id")) {
+ List<Row> result = CollectionUtil.iteratorToList(iterator);
+
+ List<Row> expected =
Arrays.asList(
- "+I[1, Alice, 1, 2, false]",
- "+I[1, Alice, 1, 2, false]",
- "+I[2, Bob, 2, 2, false]");
+ Row.of(1L, "Alice", 1L, "2", true),
+ Row.of(1L, "Alice", 1L, "2", true),
+ Row.of(2L, "Bob", 2L, "3", false));
assertThat(result).hasSize(3);
assertThat(result).isEqualTo(expected);
@@ -275,13 +241,89 @@ public class MongoDynamicTableSourceITCase {
}
}
+ @Test
+ void testFilter() {
+ tEnv.executeSql(createTestDDl(null));
+
+ // we create a VIEW here to test column remapping, i.e. would filter
push down work if we
+ // create a view that depends on our source table
+ tEnv.executeSql(
+ "CREATE VIEW fake_table (idx, f0, f1, f2, f3, f4, f5, f6, f7,
f8, f9, f10, f11, f12)"
+ + " as (SELECT * from mongo_source )");
+
+ List<Row> allRows = executeQueryToList("SELECT * FROM mongo_source");
+ assertThat(allRows).hasSize(2);
+
+ Row onlyRow1 =
+ allRows.stream()
+ .filter(row -> row.getFieldAs(0).equals(1L))
+ .findAny()
+ .orElseThrow(NullPointerException::new);
+
+ Row onlyRow2 =
+ allRows.stream()
+ .filter(row -> row.getFieldAs(0).equals(2L))
+ .findAny()
+ .orElseThrow(NullPointerException::new);
+
+ // test the EQUALS filter
+ assertThat(executeQueryToList("SELECT * FROM fake_table WHERE 1 =
idx"))
+ .containsExactly(onlyRow1);
+
+ // test TIMESTAMP filter
+ assertThat(
+ executeQueryToList(
+ "SELECT * FROM fake_table WHERE f5 = TIMESTAMP
'2022-09-07 10:25:28.127'"))
+ .containsExactly(onlyRow1);
+
+ // test the IN operator
+ assertThat(executeQueryToList("SELECT * FROM fake_table WHERE idx IN
(2, 3)"))
+ .containsExactly(onlyRow2);
+
+ // test the NOT IN operator
+ assertThat(
+ executeQueryToList(
+ "SELECT * FROM fake_table WHERE f7 NOT IN
(CAST(1.0 AS DOUBLE), CAST(1.1 AS DOUBLE))"))
+ .containsExactly(onlyRow1);
+
+ // test mixing AND and OR operator
+ assertThat(executeQueryToList("SELECT * FROM fake_table WHERE idx <> 1
OR f8 = 1.10"))
+ .containsExactlyInAnyOrderElementsOf(allRows);
+
+ // test mixing AND/OR with parenthesis, and the swapping the operand
of equal expression
+ assertThat(
+ executeQueryToList(
+ "SELECT * FROM fake_table WHERE (f0 IS NOT
NULL AND f2 IS TRUE) OR f8 = 102.2"))
+ .containsExactly(onlyRow1);
+
+ // test Greater than and Less than
+ assertThat(executeQueryToList("SELECT * FROM fake_table WHERE f8 >
1.09 AND f8 < 1.11"))
+ .containsExactly(onlyRow1);
+
+ // One more test of parenthesis
+ assertThat(
+ executeQueryToList(
+ "SELECT * FROM fake_table WHERE f0 IS NULL AND
(f8 >= 1.11 OR f4 <= 5)"))
+ .containsExactly(onlyRow2);
+
+ assertThat(
+ executeQueryToList(
+ "SELECT * FROM mongo_source WHERE _id = 2 AND
f7 > 0.8 OR f7 < 1.1"))
+ .containsExactlyInAnyOrderElementsOf(allRows);
+
+ assertThat(
+ executeQueryToList(
+ "SELECT * FROM mongo_source WHERE 1 = _id AND
f1 NOT IN ('2', '3')"))
+ .isEmpty();
+ }
+
private static void validateCachedValues(LookupCache cache) {
// mongo does support project push down, the cached row has been
projected
RowData key1 = GenericRowData.of(1L);
- RowData value1 = GenericRowData.of(1L, StringData.fromString("2"),
false);
+ RowData value1 = GenericRowData.of(1L, StringData.fromString("2"),
true);
RowData key2 = GenericRowData.of(2L);
- RowData value2 = GenericRowData.of(2L, StringData.fromString("2"),
false);
+ RowData value2 = GenericRowData.of(2L, StringData.fromString("3"),
false);
RowData key3 = GenericRowData.of(3L);
@@ -319,12 +361,13 @@ public class MongoDynamicTableSourceITCase {
"CREATE TABLE mongo_source",
"(",
" _id BIGINT,",
+ " f0 STRING,",
" f1 STRING,",
" f2 BOOLEAN,",
" f3 BINARY,",
" f4 INTEGER,",
- " f5 TIMESTAMP_LTZ(6),",
- " f6 TIMESTAMP_LTZ(3),",
+ " f5 TIMESTAMP_LTZ(3),",
+ " f6 TIMESTAMP_LTZ(0),",
" f7 DOUBLE,",
" f8 DECIMAL(10, 2),",
" f9 MAP<STRING, INTEGER>,",
@@ -336,29 +379,99 @@ public class MongoDynamicTableSourceITCase {
")"));
}
- private static BsonDocument createTestData(long id) {
- return new BsonDocument()
- .append("_id", new BsonInt64(id))
- .append("f1", new BsonString("2"))
- .append("f2", BsonBoolean.FALSE)
- .append("f3", new BsonBinary(new byte[] {(byte) 3}))
- .append("f4", new BsonInt32(6))
- // 2022-09-07T10:25:28.127Z
- .append("f5", new BsonDateTime(1662546328127L))
- .append("f6", new BsonTimestamp(1662546328, 0))
- .append("f7", new BsonDouble(0.9d))
- .append("f8", new BsonDecimal128(new Decimal128(new
BigDecimal("1.10"))))
- .append("f9", new BsonDocument("k", new BsonInt32(12)))
- .append("f10", new BsonDocument("k", new BsonInt32(13)))
- .append(
- "f11",
- new BsonArray(
- Arrays.asList(new BsonString("11_1"), new
BsonString("11_2"))))
- .append(
- "f12",
- new BsonArray(
- Arrays.asList(
- new BsonDocument("k", new
BsonString("12_1")),
- new BsonDocument("k", new
BsonString("12_2")))));
+ private static List<Row> expectedRows() {
+ return Arrays.asList(
+ Row.of(
+ 1L,
+ "",
+ "2",
+ true,
+ new byte[] {(byte) 3},
+ 6,
+ Instant.ofEpochMilli(1662546328127L),
+ Instant.ofEpochSecond(1662546328L),
+ 0.9d,
+ new BigDecimal("1.10"),
+ Collections.singletonMap("k", 12),
+ Row.of(13),
+ new String[] {"11_1", "11_2"},
+ new Row[] {Row.of("12_1"), Row.of("12_2")}),
+ Row.of(
+ 2L,
+ null,
+ "3",
+ false,
+ new byte[] {(byte) 4},
+ 7,
+ Instant.ofEpochMilli(1662546328128L),
+ Instant.ofEpochSecond(1662546329L),
+ 1.0d,
+ new BigDecimal("1.11"),
+ Collections.singletonMap("k", 13),
+ Row.of(14),
+ new String[] {"11_3", "11_4"},
+ new Row[] {Row.of("12_3"), Row.of("12_4")}));
+ }
+
+ private static List<BsonDocument> createTestData() {
+ return Arrays.asList(
+ new BsonDocument()
+ .append("_id", new BsonInt64(1L))
+ .append("f0", new BsonString(""))
+ .append("f1", new BsonString("2"))
+ .append("f2", BsonBoolean.TRUE)
+ .append("f3", new BsonBinary(new byte[] {(byte) 3}))
+ .append("f4", new BsonInt32(6))
+ // 2022-09-07T10:25:28.127Z
+ .append("f5", new BsonDateTime(1662546328127L))
+ .append("f6", new BsonTimestamp(1662546328, 0))
+ .append("f7", new BsonDouble(0.9d))
+ .append("f8", new BsonDecimal128(new Decimal128(new
BigDecimal("1.10"))))
+ .append("f9", new BsonDocument("k", new BsonInt32(12)))
+ .append("f10", new BsonDocument("k", new
BsonInt32(13)))
+ .append(
+ "f11",
+ new BsonArray(
+ Arrays.asList(
+ new BsonString("11_1"), new
BsonString("11_2"))))
+ .append(
+ "f12",
+ new BsonArray(
+ Arrays.asList(
+ new BsonDocument("k", new
BsonString("12_1")),
+ new BsonDocument("k", new
BsonString("12_2"))))),
+ new BsonDocument()
+ .append("_id", new BsonInt64(2L))
+ .append("f0", BsonNull.VALUE)
+ .append("f1", new BsonString("3"))
+ .append("f2", BsonBoolean.FALSE)
+ .append("f3", new BsonBinary(new byte[] {(byte) 4}))
+ .append("f4", new BsonInt32(7))
+ // 2022-09-07T10:25:28.128Z
+ .append("f5", new BsonDateTime(1662546328128L))
+ .append("f6", new BsonTimestamp(1662546329, 0))
+ .append("f7", new BsonDouble(1.0d))
+ .append("f8", new BsonDecimal128(new Decimal128(new
BigDecimal("1.11"))))
+ .append("f9", new BsonDocument("k", new BsonInt32(13)))
+ .append("f10", new BsonDocument("k", new
BsonInt32(14)))
+ .append(
+ "f11",
+ new BsonArray(
+ Arrays.asList(
+ new BsonString("11_3"), new
BsonString("11_4"))))
+ .append(
+ "f12",
+ new BsonArray(
+ Arrays.asList(
+ new BsonDocument("k", new
BsonString("12_3")),
+ new BsonDocument("k", new
BsonString("12_4"))))));
+ }
+
+ private static List<Row> executeQueryToList(String sql) {
+ return CollectionUtil.iteratorToList(executeQuery(sql));
+ }
+
+ private static CloseableIterator<Row> executeQuery(String sql) {
+ return tEnv.executeSql(sql).collect();
}
}
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
new file mode 100644
index 0000000..1dbb8e9
--- /dev/null
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.expressions.RexNodeExpression;
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.mongodb.client.model.Filters;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.conversions.Bson;
+import org.bson.types.Decimal128;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import scala.Option;
+
+import static
org.apache.flink.connector.mongodb.table.MongoDynamicTableSource.parseFilter;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link MongoFilterPushDownVisitor}. */
+class MongoFilterPushDownVisitorTest {
+
+ private static final String INPUT_TABLE = "mongo_source";
+
+ private static final BsonDocument EMPTY_FILTER =
Filters.empty().toBsonDocument();
+
+ private static StreamExecutionEnvironment env;
+ private static StreamTableEnvironment tEnv;
+
+ private final ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+
+ @BeforeEach
+ void before() {
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ tEnv = StreamTableEnvironment.create(env);
+ tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
+
+ // Create table in Flink, this can be reused across test cases
+ tEnv.executeSql(
+ "CREATE TABLE "
+ + INPUT_TABLE
+ + "("
+ + "id INTEGER,"
+ + "description VARCHAR(200),"
+ + "boolean_col BOOLEAN,"
+ + "timestamp_col TIMESTAMP(0),"
+ + "timestamp3_col TIMESTAMP(3),"
+ + "double_col DOUBLE,"
+ + "decimal_col DECIMAL(10, 4)"
+ + ") WITH ("
+ + " 'connector'='mongodb',"
+ + " 'uri'='mongodb://127.0.0.1:27017',"
+ + " 'database'='test_db',"
+ + " 'collection'='test_coll'"
+ + ")");
+ }
+
+ @Test
+ void testSimpleExpressionPrimitiveType() {
+ ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " +
INPUT_TABLE).getResolvedSchema();
+ Arrays.asList(
+ new Object[] {"id = 6", Filters.eq("id", new
BsonInt32(6))},
+ new Object[] {"id >= 6", Filters.gte("id", new
BsonInt32(6))},
+ new Object[] {"id > 6", Filters.gt("id", new
BsonInt32(6))},
+ new Object[] {"id < 6", Filters.lt("id", new
BsonInt32(6))},
+ new Object[] {"id <= 5", Filters.lte("id", 5)},
+ new Object[] {
+ "description = 'Halo'",
+ Filters.eq("description", new BsonString("Halo"))
+ },
+ new Object[] {
+ "boolean_col = true", Filters.eq("boolean_col",
new BsonBoolean(true))
+ },
+ new Object[] {
+ "boolean_col = false", Filters.eq("boolean_col",
new BsonBoolean(false))
+ },
+ new Object[] {
+ "double_col > 0.5",
+ Filters.gt(
+ "double_col",
+ new BsonDecimal128(new Decimal128(new
BigDecimal("0.5"))))
+ },
+ new Object[] {
+ "decimal_col <= -0.3",
+ Filters.lte(
+ "decimal_col",
+ new BsonDecimal128(new Decimal128(new
BigDecimal("-0.3"))))
+ })
+ .forEach(
+ inputs ->
+ assertGeneratedFilter(
+ (String) inputs[0],
+ schema,
+ ((Bson) inputs[1]).toBsonDocument()));
+ }
+
+ @Test
+ void testComplexExpressionDatetime() {
+ ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " +
INPUT_TABLE).getResolvedSchema();
+ assertGeneratedFilter(
+ "id = 6 AND timestamp_col = TIMESTAMP '2022-01-01 07:00:01'",
+ schema,
+ Filters.and(
+ Filters.eq("id", new BsonInt32(6)),
+ Filters.eq(
+ "timestamp_col",
+ new BsonDateTime(
+ Timestamp.valueOf("2022-01-01
07:00:01")
+ .getTime())))
+ .toBsonDocument());
+
+ assertGeneratedFilter(
+ "timestamp3_col = TIMESTAMP '2022-01-01 07:00:01.333' OR
description = 'Halo'",
+ schema,
+ Filters.or(
+ Filters.eq(
+ "timestamp3_col",
+ new BsonDateTime(
+ Timestamp.valueOf("2022-01-01
07:00:01.333")
+ .getTime())),
+ Filters.eq("description", new
BsonString("Halo")))
+ .toBsonDocument());
+ }
+
+ @Test
+ void testExpressionWithNull() {
+ ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " +
INPUT_TABLE).getResolvedSchema();
+
+ assertGeneratedFilter(
+ "id = NULL AND decimal_col <= 0.6",
+ schema,
+ Filters.and(
+ Filters.eq("id", BsonNull.VALUE),
+ Filters.lte(
+ "decimal_col",
+ new BsonDecimal128(new Decimal128(new
BigDecimal("0.6")))))
+ .toBsonDocument());
+
+ assertGeneratedFilter(
+ "id = 6 OR description = NULL",
+ schema,
+ Filters.or(
+ Filters.eq("id", new BsonInt32(6)),
+ Filters.eq("description", BsonNull.VALUE))
+ .toBsonDocument());
+ }
+
+ @Test
+ void testExpressionIsNull() {
+ ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " +
INPUT_TABLE).getResolvedSchema();
+
+ assertGeneratedFilter(
+ "id IS NULL AND decimal_col <= 0.6",
+ schema,
+ Filters.and(
+ Filters.eq("id", BsonNull.VALUE),
+ Filters.lte(
+ "decimal_col",
+ new BsonDecimal128(new Decimal128(new
BigDecimal("0.6")))))
+ .toBsonDocument());
+
+ assertGeneratedFilter(
+ "id = 6 OR description IS NOT NULL",
+ schema,
+ Filters.or(
+ Filters.eq("id", new BsonInt32(6)),
+ Filters.ne("description", BsonNull.VALUE))
+ .toBsonDocument());
+ }
+
+ @Test
+ void testExpressionCannotBePushedDown() {
+ ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " +
INPUT_TABLE).getResolvedSchema();
+
+ // unsupported operators
+ assertGeneratedFilter("description LIKE '_bcd%'", schema,
EMPTY_FILTER);
+
+ // nested complex expressions
+ assertGeneratedFilter("double_col = decimal_col", schema,
EMPTY_FILTER);
+ assertGeneratedFilter("boolean_col = (decimal_col > 2.0)", schema,
EMPTY_FILTER);
+
+ // partial push down
+ assertGeneratedFilter(
+ "id IS NULL AND description LIKE '_bcd%'",
+ schema, Filters.eq("id", BsonNull.VALUE).toBsonDocument());
+
+ // sub filter cannot be pushed down
+ assertGeneratedFilter("id IS NOT NULL OR double_col = decimal_col",
schema, EMPTY_FILTER);
+ }
+
+ private void assertGeneratedFilter(
+ String inputExpr, ResolvedSchema schema, BsonDocument expected) {
+ List<ResolvedExpression> filters =
resolveSQLFilterToExpression(inputExpr, schema);
+
+ List<Bson> mongoFilters = new ArrayList<>();
+ for (ResolvedExpression filter : filters) {
+ BsonDocument simpleFilter = parseFilter(filter);
+ if (!simpleFilter.isEmpty()) {
+ mongoFilters.add(simpleFilter);
+ }
+ }
+
+ BsonDocument actual = EMPTY_FILTER;
+ if (!mongoFilters.isEmpty()) {
+ Bson mergedFilter =
+ mongoFilters.size() == 1 ? mongoFilters.get(0) :
Filters.and(mongoFilters);
+ actual = mergedFilter.toBsonDocument();
+ }
+
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ /**
+ * Resolve a SQL filter expression against a Schema, this method makes use
of some
+ * implementation details of Flink.
+ */
+ private List<ResolvedExpression> resolveSQLFilterToExpression(
+ String sqlExp, ResolvedSchema schema) {
+ StreamTableEnvironmentImpl tbImpl = (StreamTableEnvironmentImpl) tEnv;
+
+ FlinkContext ctx = ((PlannerBase)
tbImpl.getPlanner()).getFlinkContext();
+ CatalogManager catMan = tbImpl.getCatalogManager();
+ FunctionCatalog funCat = ctx.getFunctionCatalog();
+ RowType sourceType = (RowType)
schema.toSourceRowDataType().getLogicalType();
+
+ FlinkTypeFactory typeFactory = new FlinkTypeFactory(classLoader,
FlinkTypeSystem.INSTANCE);
+ RexBuilder rexBuilder = new RexBuilder(typeFactory);
+ RexNodeToExpressionConverter converter =
+ new RexNodeToExpressionConverter(
+ rexBuilder,
+ sourceType.getFieldNames().toArray(new String[0]),
+ funCat,
+ catMan,
+
TimeZone.getTimeZone(tEnv.getConfig().getLocalTimeZone()));
+
+ RexNodeExpression rexExp =
+ (RexNodeExpression)
tbImpl.getParser().parseSqlExpression(sqlExp, sourceType, null);
+
+ RexNode cnf = FlinkRexUtil.toCnf(rexBuilder, -1, rexExp.getRexNode());
+ // converts the cnf condition to a list of AND conditions
+ List<RexNode> conjunctions = RelOptUtil.conjunctions(cnf);
+
+ List<Expression> resolvedExps =
+ conjunctions.stream()
+ .map(rex -> rex.accept(converter))
+ .filter(Option::isDefined)
+ .map(Option::get)
+ .collect(Collectors.toList());
+
+ ExpressionResolver resolver =
+ ExpressionResolver.resolverFor(
+ tEnv.getConfig(),
+ classLoader,
+ name -> Optional.empty(),
+ funCat.asLookup(
+ str -> {
+ throw new TableException(
+ "We should not need to
lookup any expressions at this point");
+ }),
+ catMan.getDataTypeFactory(),
+ (sqlExpression, inputRowType, outputType) -> {
+ throw new TableException(
+ "SQL expression parsing is not
supported at this location.");
+ })
+ .build();
+
+ return resolver.resolve(resolvedExps);
+ }
+}
diff --git
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
new file mode 100644
index 0000000..133cbe7
--- /dev/null
+++
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.rules.TestName;
+
+import java.time.ZoneId;
+
+/** Plan tests for Mongo connector, for example, testing projection push down.
*/
+public class MongoTablePlanTest extends TableTestBase {
+
+ private final StreamTableTestUtil util =
streamTestUtil(TableConfig.getDefault());
+
+ private TestInfo testInfo;
+
+ @BeforeEach
+ public void setup(TestInfo testInfo) {
+ this.testInfo = testInfo;
+ TableEnvironment tEnv = util.tableEnv();
+ tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
+ tEnv.executeSql(
+ "CREATE TABLE mongo ("
+ + "id BIGINT,"
+ + "description VARCHAR(200),"
+ + "boolean_col BOOLEAN,"
+ + "timestamp_col TIMESTAMP_LTZ(0),"
+ + "timestamp3_col TIMESTAMP_LTZ(3),"
+ + "int_col INTEGER,"
+ + "double_col DOUBLE,"
+ + "decimal_col DECIMAL(10, 4)"
+ + ") WITH ("
+ + " 'connector'='mongodb',"
+ + " 'uri'='mongodb://127.0.0.1:27017',"
+ + " 'database'='test_db',"
+ + " 'collection'='test_coll'"
+ + ")");
+ }
+
+ @Test
+ public void testFilterPushdown() {
+ util.verifyExecPlan(
+ "SELECT id, timestamp3_col, int_col FROM mongo WHERE id =
900001 AND timestamp3_col <> TIMESTAMP '2022-09-07 10:25:28.127' OR double_col
>= -1000.23");
+ }
+
+ @Test
+ public void testFilterPartialPushdown() {
+ util.verifyExecPlan(
+ "SELECT id, timestamp3_col, int_col FROM mongo WHERE id =
900001 AND boolean_col = (decimal_col > 2.0)");
+ }
+
+ @Test
+ public void testFilterCannotPushdown() {
+ util.verifyExecPlan(
+ "SELECT id, timestamp3_col, int_col FROM mongo WHERE id IS NOT
NULL OR double_col = decimal_col");
+ }
+
+ // A workaround to get the test method name for flink versions not
completely migrated to JUnit5
+ public TestName name() {
+ return new TestName() {
+ @Override
+ public String getMethodName() {
+ return testInfo.getTestMethod().get().getName();
+ }
+ };
+ }
+}
diff --git
a/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
b/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
new file mode 100644
index 0000000..b2c57e4
--- /dev/null
+++
b/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testFilterPushdown">
+ <Resource name="sql">
+ <![CDATA[SELECT id, timestamp3_col, int_col FROM mongo WHERE id = 900001
AND timestamp3_col <> TIMESTAMP '2022-09-07 10:25:28.127' OR double_col >=
-1000.23]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], timestamp3_col=[$4], int_col=[$5])
++- LogicalFilter(condition=[OR(AND(=($0, 900001), <>($4, CAST(2022-09-07
10:25:28.127:TIMESTAMP(3)):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL)), >=($6,
-1000.23:DECIMAL(6, 2)))])
+ +- LogicalTableScan(table=[[default_catalog, default_database, mongo]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, mongo,
filter=[and(OR(=(id, 900001:BIGINT), >=(double_col, -1000.23:DECIMAL(6, 2))),
OR(<>(timestamp3_col, 2022-09-07
10:25:28.127:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), >=(double_col,
-1000.23:DECIMAL(6, 2))))], project=[id, timestamp3_col, int_col]]],
fields=[id, timestamp3_col, int_col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFilterPartialPushdown">
+ <Resource name="sql">
+ <![CDATA[SELECT id, timestamp3_col, int_col FROM mongo WHERE id = 900001
AND boolean_col = (decimal_col > 2.0)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], timestamp3_col=[$4], int_col=[$5])
++- LogicalFilter(condition=[AND(=($0, 900001), =($2, >($7, 2.0:DECIMAL(2,
1))))])
+ +- LogicalTableScan(table=[[default_catalog, default_database, mongo]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[CAST(900001 AS BIGINT) AS id, timestamp3_col, int_col],
where=[(boolean_col = (decimal_col > 2.0))])
++- TableSourceScan(table=[[default_catalog, default_database, mongo,
filter=[=(id, 900001:BIGINT)], project=[boolean_col, timestamp3_col, int_col,
decimal_col]]], fields=[boolean_col, timestamp3_col, int_col, decimal_col])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFilterCannotPushdown">
+ <Resource name="sql">
+ <![CDATA[SELECT id, timestamp3_col, int_col FROM mongo WHERE id IS NOT
NULL OR double_col = decimal_col]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], timestamp3_col=[$4], int_col=[$5])
++- LogicalFilter(condition=[OR(IS NOT NULL($0), =($6, CAST($7):DOUBLE))])
+ +- LogicalTableScan(table=[[default_catalog, default_database, mongo]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[id, timestamp3_col, int_col], where=[(id IS NOT NULL OR
(double_col = CAST(decimal_col AS DOUBLE)))])
++- TableSourceScan(table=[[default_catalog, default_database, mongo,
filter=[], project=[id, timestamp3_col, int_col, double_col, decimal_col]]],
fields=[id, timestamp3_col, int_col, double_col, decimal_col])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/pom.xml b/pom.xml
index 7d2414b..9b470ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,8 @@ under the License.
<mongodb.version>4.7.2</mongodb.version>
<flink.version>1.17.0</flink.version>
-
+ <scala.binary.version>2.12</scala.binary.version>
+ <scala-library.version>2.12.7</scala-library.version>
<junit5.version>5.8.1</junit5.version>
<assertj.version>3.21.0</assertj.version>
<testcontainers.version>1.17.2</testcontainers.version>
@@ -311,6 +312,13 @@ under the License.
<version>2.1</version>
</dependency>
+ <!-- For dependency convergence -->
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala-library.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>