This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git


The following commit(s) were added to refs/heads/main by this push:
     new fbabbd6  [FLINK-33257][connectors/mongodb] Support filter pushdown in 
MongoDB connector
fbabbd6 is described below

commit fbabbd6b2d535c1c3fe37636e476f1a52990fc48
Author: Jiabao Sun <[email protected]>
AuthorDate: Fri Jan 26 00:10:17 2024 +0800

    [FLINK-33257][connectors/mongodb] Support filter pushdown in MongoDB 
connector
    
    This closes #17.
---
 docs/content.zh/docs/connectors/table/mongodb.md   |  56 ++++
 docs/content/docs/connectors/table/mongodb.md      |  56 ++++
 flink-connector-mongodb/pom.xml                    |  10 +-
 .../connector/mongodb/source/MongoSource.java      |   7 +
 .../mongodb/source/MongoSourceBuilder.java         |  17 ++
 .../reader/split/MongoScanSourceSplitReader.java   |   7 +-
 .../mongodb/table/MongoDynamicTableSource.java     |  69 ++++-
 .../mongodb/table/MongoFilterPushDownVisitor.java  | 249 ++++++++++++++++
 .../mongodb/source/MongoSourceITCase.java          |  11 +-
 .../table/MongoDynamicTableSourceITCase.java       | 291 +++++++++++++------
 .../table/MongoFilterPushDownVisitorTest.java      | 319 +++++++++++++++++++++
 .../mongodb/table/MongoTablePlanTest.java          |  89 ++++++
 .../connector/mongodb/table/MongoTablePlanTest.xml |  72 +++++
 pom.xml                                            |  10 +-
 14 files changed, 1159 insertions(+), 104 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/mongodb.md 
b/docs/content.zh/docs/connectors/table/mongodb.md
index fd0ba4e..c111481 100644
--- a/docs/content.zh/docs/connectors/table/mongodb.md
+++ b/docs/content.zh/docs/connectors/table/mongodb.md
@@ -340,6 +340,62 @@ lookup cache 的主要目的是用于提高时态表关联 MongoDB 连接器的
 如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,这可能导致在恢复过程中重复处理消息。
 强烈推荐使用 upsert 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。
 
+### 过滤器下推
+
+MongoDB 支持将 Flink SQL 的简单比较和逻辑过滤器下推以优化查询。
+Flink SQL 过滤器到 MongoDB 查询操作符的映射如下表所示。
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Flink SQL filters</th>
+        <th class="text-left"><a 
href="https://www.mongodb.com/docs/manual/reference/operator/query/";>MongoDB 
Query Operators</a></th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>=</code></td>
+      <td><code>$eq</code></td>
+    </tr>
+    <tr>
+      <td><code>&lt;&gt;</code></td>
+      <td><code>$ne</code></td>
+    </tr>
+    <tr>
+      <td><code>&gt;</code></td>
+      <td><code>$gt</code></td>
+    </tr>
+    <tr>
+      <td><code>&gt;=</code></td>
+      <td><code>$gte</code></td>
+    </tr>
+    <tr>
+      <td><code>&lt;</code></td>
+      <td><code>$lt</code></td>
+    </tr>
+    <tr>
+      <td><code>&lt;=</code></td>
+      <td><code>$lte</code></td>
+    </tr>
+    <tr>
+      <td><code>IS NULL</code></td>
+      <td><code>$eq : null</code></td>
+    </tr>
+    <tr>
+      <td><code>IS NOT NULL</code></td>
+      <td><code>$ne : null</code></td>
+    </tr>
+    <tr>
+      <td><code>OR</code></td>
+      <td><code>$or</code></td>
+    </tr>
+    <tr>
+      <td><code>AND</code></td>
+      <td><code>$and</code></td>
+    </tr>
+    </tbody>
+</table>
+
 数据类型映射
 ----------------
 MongoDB BSON 类型到 Flink SQL 数据类型的映射如下表所示。
diff --git a/docs/content/docs/connectors/table/mongodb.md 
b/docs/content/docs/connectors/table/mongodb.md
index 9a5dfd6..7ce3bdf 100644
--- a/docs/content/docs/connectors/table/mongodb.md
+++ b/docs/content/docs/connectors/table/mongodb.md
@@ -371,6 +371,62 @@ If there are failures, the Flink job will recover and 
re-process from last succe
 which can lead to re-processing messages during recovery. The upsert mode is 
highly recommended as 
 it helps avoid constraint violations or duplicate data if records need to be 
re-processed.
 
+### Filters Pushdown
+
+MongoDB supports pushing down simple comparisons and logical filters to 
optimize queries.
+The mappings from Flink SQL filters to MongoDB query operators are listed in 
the following table.
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Flink SQL filters</th>
+        <th class="text-left"><a 
href="https://www.mongodb.com/docs/manual/reference/operator/query/";>MongoDB 
Query Operators</a></th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>=</code></td>
+      <td><code>$eq</code></td>
+    </tr>
+    <tr>
+      <td><code>&lt;&gt;</code></td>
+      <td><code>$ne</code></td>
+    </tr>
+    <tr>
+      <td><code>&gt;</code></td>
+      <td><code>$gt</code></td>
+    </tr>
+    <tr>
+      <td><code>&gt;=</code></td>
+      <td><code>$gte</code></td>
+    </tr>
+    <tr>
+      <td><code>&lt;</code></td>
+      <td><code>$lt</code></td>
+    </tr>
+    <tr>
+      <td><code>&lt;=</code></td>
+      <td><code>$lte</code></td>
+    </tr>
+    <tr>
+      <td><code>IS NULL</code></td>
+      <td><code>$eq : null</code></td>
+    </tr>
+    <tr>
+      <td><code>IS NOT NULL</code></td>
+      <td><code>$ne : null</code></td>
+    </tr>
+    <tr>
+      <td><code>OR</code></td>
+      <td><code>$or</code></td>
+    </tr>
+    <tr>
+      <td><code>AND</code></td>
+      <td><code>$and</code></td>
+    </tr>
+    </tbody>
+</table>
+
 Data Type Mapping
 ----------------
 The field data type mappings from MongoDB BSON types to Flink SQL data types 
are listed in the following table.
diff --git a/flink-connector-mongodb/pom.xml b/flink-connector-mongodb/pom.xml
index 7eb4182..e8959a8 100644
--- a/flink-connector-mongodb/pom.xml
+++ b/flink-connector-mongodb/pom.xml
@@ -115,11 +115,19 @@ under the License.
                <!-- Table API integration tests -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table-planner-loader</artifactId>
+                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${flink.version}</version>
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                       <version>${flink.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table-runtime</artifactId>
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
index 6b1b89c..bbec562 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSource.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.connector.mongodb.source.split.MongoSourceSplitSerialize
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
 
 import javax.annotation.Nullable;
 
@@ -87,6 +88,9 @@ public class MongoSource<OUT>
     /** The projections for MongoDB source. */
     @Nullable private final List<String> projectedFields;
 
+    /** The filter for MongoDB source. */
+    private final BsonDocument filter;
+
     /** The limit for MongoDB source. */
     private final int limit;
 
@@ -100,11 +104,13 @@ public class MongoSource<OUT>
             MongoConnectionOptions connectionOptions,
             MongoReadOptions readOptions,
             @Nullable List<String> projectedFields,
+            Bson filter,
             int limit,
             MongoDeserializationSchema<OUT> deserializationSchema) {
         this.connectionOptions = checkNotNull(connectionOptions);
         this.readOptions = checkNotNull(readOptions);
         this.projectedFields = projectedFields;
+        this.filter = filter.toBsonDocument();
         this.limit = limit;
         // Only support bounded mode for now.
         // We can implement unbounded mode by ChangeStream future.
@@ -140,6 +146,7 @@ public class MongoSource<OUT>
                                 connectionOptions,
                                 readOptions,
                                 projectedFields,
+                                filter,
                                 mongoReaderContext);
 
         return new MongoSourceReader<>(
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSourceBuilder.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSourceBuilder.java
index c5252eb..6aa0af4 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSourceBuilder.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/MongoSourceBuilder.java
@@ -25,7 +25,9 @@ import 
org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionSt
 import 
org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
 import 
org.apache.flink.connector.mongodb.source.reader.split.MongoScanSourceSplitReader;
 
+import com.mongodb.client.model.Filters;
 import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
 
 import java.util.Arrays;
 import java.util.List;
@@ -47,6 +49,9 @@ public class MongoSourceBuilder<OUT> {
     private final MongoReadOptions.MongoReadOptionsBuilder readOptionsBuilder;
 
     private List<String> projectedFields;
+
+    private Bson filter = Filters.empty();
+
     private int limit = -1;
     private MongoDeserializationSchema<OUT> deserializationSchema;
 
@@ -168,6 +173,17 @@ public class MongoSourceBuilder<OUT> {
         return this;
     }
 
+    /**
+     * Sets the filter of documents to read.
+     *
+     * @param filter the filter of documents to read.
+     * @return this builder
+     */
+    public MongoSourceBuilder<OUT> setFilter(Bson filter) {
+        this.filter = checkNotNull(filter, "The filter must not be null");
+        return this;
+    }
+
     /**
      * Sets the projection fields of documents to read.
      *
@@ -216,6 +232,7 @@ public class MongoSourceBuilder<OUT> {
                 connectionOptionsBuilder.build(),
                 readOptionsBuilder.build(),
                 projectedFields,
+                filter,
                 limit,
                 deserializationSchema);
     }
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
index e521c91..26c584e 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java
@@ -36,6 +36,7 @@ import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCursor;
 import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +58,8 @@ public class MongoScanSourceSplitReader implements 
MongoSourceSplitReader<MongoS
     private final MongoSourceReaderContext readerContext;
     @Nullable private final List<String> projectedFields;
 
+    private final Bson filter;
+
     private boolean closed = false;
     private boolean finished = false;
     private MongoClient mongoClient;
@@ -67,10 +70,12 @@ public class MongoScanSourceSplitReader implements 
MongoSourceSplitReader<MongoS
             MongoConnectionOptions connectionOptions,
             MongoReadOptions readOptions,
             @Nullable List<String> projectedFields,
+            Bson filter,
             MongoSourceReaderContext readerContext) {
         this.connectionOptions = connectionOptions;
         this.readOptions = readOptions;
         this.projectedFields = projectedFields;
+        this.filter = filter;
         this.readerContext = readerContext;
     }
 
@@ -174,7 +179,7 @@ public class MongoScanSourceSplitReader implements 
MongoSourceSplitReader<MongoS
                     mongoClient
                             .getDatabase(connectionOptions.getDatabase())
                             .getCollection(connectionOptions.getCollection(), 
BsonDocument.class)
-                            .find()
+                            .find(filter)
                             .min(currentSplit.getMin())
                             .max(currentSplit.getMax())
                             .hint(currentSplit.getHint())
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
index d3e2a53..a20f99a 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceProvider;
+import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
@@ -36,10 +37,18 @@ import 
org.apache.flink.table.connector.source.lookup.LookupOptions;
 import 
org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
 import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import com.mongodb.client.model.Filters;
+import org.bson.BsonDocument;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -55,8 +64,11 @@ public class MongoDynamicTableSource
         implements ScanTableSource,
                 LookupTableSource,
                 SupportsProjectionPushDown,
+                SupportsFilterPushDown,
                 SupportsLimitPushDown {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDynamicTableSource.class);
+
     private final MongoConnectionOptions connectionOptions;
     private final MongoReadOptions readOptions;
     @Nullable private final LookupCache lookupCache;
@@ -65,6 +77,8 @@ public class MongoDynamicTableSource
     private DataType producedDataType;
     private int limit = -1;
 
+    private BsonDocument filter = Filters.empty().toBsonDocument();
+
     public MongoDynamicTableSource(
             MongoConnectionOptions connectionOptions,
             MongoReadOptions readOptions,
@@ -135,6 +149,7 @@ public class MongoDynamicTableSource
                         
.setSamplesPerPartition(readOptions.getSamplesPerPartition())
                         .setLimit(limit)
                         
.setProjectedFields(DataType.getFieldNames(producedDataType))
+                        .setFilter(filter)
                         .setDeserializationSchema(deserializationSchema)
                         .build();
 
@@ -148,13 +163,16 @@ public class MongoDynamicTableSource
 
     @Override
     public DynamicTableSource copy() {
-        return new MongoDynamicTableSource(
-                connectionOptions,
-                readOptions,
-                lookupCache,
-                lookupMaxRetries,
-                lookupRetryIntervalMs,
-                producedDataType);
+        MongoDynamicTableSource newSource =
+                new MongoDynamicTableSource(
+                        connectionOptions,
+                        readOptions,
+                        lookupCache,
+                        lookupMaxRetries,
+                        lookupRetryIntervalMs,
+                        producedDataType);
+        newSource.filter = BsonDocument.parse(filter.toJson());
+        return newSource;
     }
 
     @Override
@@ -178,6 +196,41 @@ public class MongoDynamicTableSource
         this.producedDataType = producedDataType;
     }
 
+    @Override
+    public Result applyFilters(List<ResolvedExpression> filters) {
+        List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+        List<ResolvedExpression> remainingFilters = new ArrayList<>();
+
+        List<Bson> mongoFilters = new ArrayList<>();
+        for (ResolvedExpression filter : filters) {
+            BsonDocument simpleFilter = parseFilter(filter);
+            if (simpleFilter.isEmpty()) {
+                remainingFilters.add(filter);
+            } else {
+                acceptedFilters.add(filter);
+                mongoFilters.add(simpleFilter);
+            }
+        }
+
+        if (!mongoFilters.isEmpty()) {
+            Bson mergedFilter =
+                    mongoFilters.size() == 1 ? mongoFilters.get(0) : 
Filters.and(mongoFilters);
+            this.filter = mergedFilter.toBsonDocument();
+            LOG.info("Pushed down filters: {}", filter.toJson());
+        }
+
+        return Result.of(acceptedFilters, remainingFilters);
+    }
+
+    static BsonDocument parseFilter(ResolvedExpression filter) {
+        if (filter instanceof CallExpression) {
+            CallExpression callExp = (CallExpression) filter;
+            return MongoFilterPushDownVisitor.INSTANCE.visit(callExp);
+        } else {
+            return Filters.empty().toBsonDocument();
+        }
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof MongoDynamicTableSource)) {
@@ -188,6 +241,7 @@ public class MongoDynamicTableSource
                 && Objects.equals(readOptions, that.readOptions)
                 && Objects.equals(producedDataType, that.producedDataType)
                 && Objects.equals(limit, that.limit)
+                && Objects.equals(filter, that.filter)
                 && Objects.equals(lookupCache, that.lookupCache)
                 && Objects.equals(lookupMaxRetries, that.lookupMaxRetries)
                 && Objects.equals(lookupRetryIntervalMs, 
that.lookupRetryIntervalMs);
@@ -200,6 +254,7 @@ public class MongoDynamicTableSource
                 readOptions,
                 producedDataType,
                 limit,
+                filter,
                 lookupCache,
                 lookupMaxRetries,
                 lookupRetryIntervalMs);
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitor.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitor.java
new file mode 100644
index 0000000..a8735b8
--- /dev/null
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitor.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import com.mongodb.client.model.Filters;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.BsonType;
+import org.bson.BsonUndefined;
+import org.bson.BsonValue;
+import org.bson.conversions.Bson;
+import org.bson.types.Decimal128;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Visitor that convert Expression to Bson filter. Return {@link 
Filters#empty()} if we cannot push
+ * down the filter.
+ */
+@Experimental
+public class MongoFilterPushDownVisitor extends 
ExpressionDefaultVisitor<BsonValue> {
+
+    public static final MongoFilterPushDownVisitor INSTANCE = new 
MongoFilterPushDownVisitor();
+
+    private MongoFilterPushDownVisitor() {}
+
+    @Override
+    public BsonDocument visit(CallExpression call) {
+        Bson filter = Filters.empty();
+        if 
(BuiltInFunctionDefinitions.EQUALS.equals(call.getFunctionDefinition())) {
+            filter = renderBinaryComparisonOperator("$eq", 
call.getResolvedChildren());
+        }
+        if 
(BuiltInFunctionDefinitions.LESS_THAN.equals(call.getFunctionDefinition())) {
+            filter = renderBinaryComparisonOperator("$lt", 
call.getResolvedChildren());
+        }
+        if 
(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
 {
+            filter = renderBinaryComparisonOperator("$lte", 
call.getResolvedChildren());
+        }
+        if 
(BuiltInFunctionDefinitions.GREATER_THAN.equals(call.getFunctionDefinition())) {
+            filter = renderBinaryComparisonOperator("$gt", 
call.getResolvedChildren());
+        }
+        if 
(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(call.getFunctionDefinition()))
 {
+            filter = renderBinaryComparisonOperator("$gte", 
call.getResolvedChildren());
+        }
+        if 
(BuiltInFunctionDefinitions.NOT_EQUALS.equals(call.getFunctionDefinition())) {
+            filter = renderBinaryComparisonOperator("$ne", 
call.getResolvedChildren());
+        }
+        if 
(BuiltInFunctionDefinitions.IS_NULL.equals(call.getFunctionDefinition())) {
+            filter =
+                    renderUnaryComparisonOperator(
+                            "$eq", call.getResolvedChildren().get(0), 
BsonNull.VALUE);
+        }
+        if 
(BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) {
+            filter =
+                    renderUnaryComparisonOperator(
+                            "$ne", call.getResolvedChildren().get(0), 
BsonNull.VALUE);
+        }
+        if 
(BuiltInFunctionDefinitions.OR.equals(call.getFunctionDefinition())) {
+            filter = renderLogicalOperator("$or", call.getResolvedChildren());
+        }
+        if 
(BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
+            filter = renderLogicalOperator("$and", call.getResolvedChildren());
+        }
+        return filter.toBsonDocument();
+    }
+
+    private Bson renderBinaryComparisonOperator(
+            String operator, List<ResolvedExpression> expressions) {
+        Optional<FieldReferenceExpression> fieldReferenceExpr =
+                extractExpression(expressions, FieldReferenceExpression.class);
+        Optional<ValueLiteralExpression> fieldValueExpr =
+                extractExpression(expressions, ValueLiteralExpression.class);
+
+        // Nested complex expressions are not supported. e.g. f1 = (f2 > 2)
+        if (!fieldReferenceExpr.isPresent() || !fieldValueExpr.isPresent()) {
+            return Filters.empty();
+        }
+
+        String fieldName = visit(fieldReferenceExpr.get()).getValue();
+        BsonValue fieldValue = visit(fieldValueExpr.get());
+
+        // Unsupported values
+        if (fieldValue.getBsonType() == BsonType.UNDEFINED) {
+            return Filters.empty();
+        }
+
+        switch (operator) {
+            case "$eq":
+                return Filters.eq(fieldName, fieldValue);
+            case "$lt":
+                return Filters.lt(fieldName, fieldValue);
+            case "$lte":
+                return Filters.lte(fieldName, fieldValue);
+            case "$gt":
+                return Filters.gt(fieldName, fieldValue);
+            case "$gte":
+                return Filters.gte(fieldName, fieldValue);
+            case "$ne":
+                return Filters.ne(fieldName, fieldValue);
+            default:
+                return Filters.empty();
+        }
+    }
+
+    private Bson renderUnaryComparisonOperator(
+            String operator, ResolvedExpression operand, BsonValue value) {
+        if (operand instanceof FieldReferenceExpression) {
+            String fieldName = visit((FieldReferenceExpression) 
operand).getValue();
+            switch (operator) {
+                case "$eq":
+                    return Filters.eq(fieldName, value);
+                case "$ne":
+                    return Filters.ne(fieldName, value);
+                default:
+                    return Filters.empty();
+            }
+        } else {
+            return Filters.empty();
+        }
+    }
+
+    private Bson renderLogicalOperator(String operator, 
List<ResolvedExpression> operands) {
+        Bson[] filters = new Bson[operands.size()];
+        for (int i = 0; i < operands.size(); i++) {
+            ResolvedExpression operand = operands.get(i);
+            BsonValue filter = operand.accept(this);
+
+            // sub-filters that cannot be pushed down
+            if (!filter.isDocument() || filter.asDocument().isEmpty()) {
+                return Filters.empty();
+            }
+            filters[i] = filter.asDocument();
+        }
+
+        switch (operator) {
+            case "$or":
+                return Filters.or(filters);
+            case "$and":
+                return Filters.and(filters);
+            default:
+                return Filters.empty();
+        }
+    }
+
+    @Override
+    public BsonValue visit(ValueLiteralExpression litExp) {
+        LogicalType type = litExp.getOutputDataType().getLogicalType();
+        Optional<BsonValue> value;
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                value = litExp.getValueAs(String.class).map(BsonString::new);
+                break;
+            case BOOLEAN:
+                value = litExp.getValueAs(Boolean.class).map(BsonBoolean::new);
+                break;
+            case DECIMAL:
+                value =
+                        litExp.getValueAs(BigDecimal.class)
+                                .map(Decimal128::new)
+                                .map(BsonDecimal128::new);
+                break;
+            case INTEGER:
+                value = litExp.getValueAs(Integer.class).map(BsonInt32::new);
+                break;
+            case BIGINT:
+                value = litExp.getValueAs(Long.class).map(BsonInt64::new);
+                break;
+            case DOUBLE:
+                value = litExp.getValueAs(Double.class).map(BsonDouble::new);
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                value =
+                        litExp.getValueAs(LocalDateTime.class)
+                                .map(Timestamp::valueOf)
+                                .map(Timestamp::getTime)
+                                .map(BsonDateTime::new);
+                break;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                value =
+                        litExp.getValueAs(Instant.class)
+                                .map(Instant::toEpochMilli)
+                                .map(BsonDateTime::new);
+                break;
+            default:
+                // Use BsonUndefined to represent unsupported values.
+                value = Optional.of(new BsonUndefined());
+                break;
+        }
+        return value.orElse(BsonNull.VALUE);
+    }
+
+    @Override
+    public BsonString visit(FieldReferenceExpression fieldReference) {
+        return new BsonString(fieldReference.toString());
+    }
+
+    @Override
+    protected BsonDocument defaultMethod(Expression expression) {
+        return Filters.empty().toBsonDocument();
+    }
+
+    private static <T> Optional<T> extractExpression(
+            List<ResolvedExpression> expressions, Class<T> type) {
+        for (ResolvedExpression expression : expressions) {
+            if (type.isAssignableFrom(expression.getClass())) {
+                return Optional.of(type.cast(expression));
+            }
+        }
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/MongoSourceITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/MongoSourceITCase.java
index 0578c57..57a49ff 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/MongoSourceITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/source/MongoSourceITCase.java
@@ -76,7 +76,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for using Mongo Source. */
 @Testcontainers
-public class MongoSourceITCase {
+class MongoSourceITCase {
 
     private static final int PARALLELISM = 2;
 
@@ -175,7 +175,7 @@ public class MongoSourceITCase {
 
     @ParameterizedTest
     @MethodSource("providePartitionStrategyAndCollection")
-    public void testPartitionStrategy(PartitionStrategy partitionStrategy, 
String collection)
+    void testPartitionStrategy(PartitionStrategy partitionStrategy, String 
collection)
             throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -184,6 +184,7 @@ public class MongoSourceITCase {
                         .setPartitionSize(MemorySize.parse("1mb"))
                         .setSamplesPerPartition(3)
                         .setPartitionStrategy(partitionStrategy)
+                        .setFilter(Filters.gt("f0", new BsonInt32(10000)))
                         .build();
 
         List<RowData> results =
@@ -194,11 +195,11 @@ public class MongoSourceITCase {
                                         "MongoDB-Source")
                                 .executeAndCollect());
 
-        assertThat(results).hasSize(TEST_RECORD_SIZE);
+        assertThat(results).hasSize(TEST_RECORD_SIZE - 10000);
     }
 
     @Test
-    public void testLimit() throws Exception {
+    void testLimit() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         final int limitSize = 100;
@@ -220,7 +221,7 @@ public class MongoSourceITCase {
     }
 
     @Test
-    public void testProject() throws Exception {
+    void testProject() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         MongoSource<String> mongoSource =
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
index e55ef6f..74e2de1 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
@@ -51,6 +51,7 @@ import org.bson.BsonDocument;
 import org.bson.BsonDouble;
 import org.bson.BsonInt32;
 import org.bson.BsonInt64;
+import org.bson.BsonNull;
 import org.bson.BsonString;
 import org.bson.BsonTimestamp;
 import org.bson.types.Decimal128;
@@ -68,17 +69,15 @@ import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
@@ -88,7 +87,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** ITCase for {@link MongoDynamicTableSource}. */
 @Testcontainers
-public class MongoDynamicTableSourceITCase {
+class MongoDynamicTableSourceITCase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MongoDynamicTableSinkITCase.class);
 
@@ -108,8 +107,8 @@ public class MongoDynamicTableSourceITCase {
 
     private static MongoClient mongoClient;
 
-    public static StreamExecutionEnvironment env;
-    public static StreamTableEnvironment tEnv;
+    private static StreamExecutionEnvironment env;
+    private static StreamTableEnvironment tEnv;
 
     @BeforeAll
     static void beforeAll() {
@@ -121,8 +120,7 @@ public class MongoDynamicTableSourceITCase {
                         .getCollection(TEST_COLLECTION)
                         .withDocumentClass(BsonDocument.class);
 
-        List<BsonDocument> testRecords = Arrays.asList(createTestData(1), 
createTestData(2));
-        coll.insertMany(testRecords);
+        coll.insertMany(createTestData());
     }
 
     @AfterAll
@@ -136,70 +134,42 @@ public class MongoDynamicTableSourceITCase {
     void before() {
         env = StreamExecutionEnvironment.getExecutionEnvironment();
         tEnv = StreamTableEnvironment.create(env);
+        tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
     }
 
     @Test
-    public void testSource() {
+    void testSource() {
         tEnv.executeSql(createTestDDl(null));
 
-        Iterator<Row> collected = tEnv.executeSql("SELECT * FROM 
mongo_source").collect();
-        List<String> result =
-                CollectionUtil.iteratorToList(collected).stream()
-                        .map(Row::toString)
-                        .sorted()
-                        .collect(Collectors.toList());
+        List<Row> result = executeQueryToList("SELECT * FROM mongo_source");
 
-        List<String> expected =
-                Stream.of(
-                                "+I[1, 2, false, [3], 6, 
2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.10, {k=12}, +I[13], 
[11_1, 11_2], [+I[12_1], +I[12_2]]]",
-                                "+I[2, 2, false, [3], 6, 
2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.10, {k=12}, +I[13], 
[11_1, 11_2], [+I[12_1], +I[12_2]]]")
-                        .sorted()
-                        .collect(Collectors.toList());
-
-        assertThat(result).isEqualTo(expected);
+        assertThat(result).isEqualTo(expectedRows());
     }
 
     @Test
-    public void testProject() {
+    void testProject() {
         tEnv.executeSql(createTestDDl(null));
 
-        Iterator<Row> collected = tEnv.executeSql("SELECT f1, f10 FROM 
mongo_source").collect();
-        List<String> result =
-                CollectionUtil.iteratorToList(collected).stream()
-                        .map(Row::toString)
-                        .sorted()
-                        .collect(Collectors.toList());
+        List<Row> result = executeQueryToList("SELECT f1, f10 FROM 
mongo_source");
 
-        List<String> expected =
-                Stream.of("+I[2, +I[13]]", "+I[2, 
+I[13]]").sorted().collect(Collectors.toList());
+        List<Row> expected = Arrays.asList(Row.of("2", Row.of(13)), 
Row.of("3", Row.of(14)));
 
         assertThat(result).isEqualTo(expected);
     }
 
     @Test
-    public void testLimit() {
+    void testLimit() {
         tEnv.executeSql(createTestDDl(null));
 
-        Iterator<Row> collected = tEnv.executeSql("SELECT * FROM mongo_source 
LIMIT 1").collect();
-        List<String> result =
-                CollectionUtil.iteratorToList(collected).stream()
-                        .map(Row::toString)
-                        .sorted()
-                        .collect(Collectors.toList());
-
-        Set<String> expected = new HashSet<>();
-        expected.add(
-                "+I[1, 2, false, [3], 6, 2022-09-07T10:25:28.127Z, 
2022-09-07T10:25:28Z, 0.9, 1.10, {k=12}, +I[13], [11_1, 11_2], [+I[12_1], 
+I[12_2]]]");
-        expected.add(
-                "+I[2, 2, false, [3], 6, 2022-09-07T10:25:28.127Z, 
2022-09-07T10:25:28Z, 0.9, 1.10, {k=12}, +I[13], [11_1, 11_2], [+I[12_1], 
+I[12_2]]]");
+        List<Row> result = executeQueryToList("SELECT * FROM mongo_source 
LIMIT 1");
 
         assertThat(result).hasSize(1);
-        assertThat(result).containsAnyElementsOf(expected);
+        assertThat(result).containsAnyElementsOf(expectedRows());
     }
 
     @ParameterizedTest
     @EnumSource(Caching.class)
-    public void testLookupJoin(Caching caching) throws Exception {
+    void testLookupJoin(Caching caching) throws Exception {
         // Create MongoDB lookup table
         Map<String, String> lookupOptions = new HashMap<>();
         if (caching.equals(Caching.ENABLE_CACHE)) {
@@ -239,20 +209,16 @@ public class MongoDynamicTableSourceITCase {
 
         // Execute lookup join
         try (CloseableIterator<Row> iterator =
-                tEnv.executeSql(
-                                "SELECT S.id, S.name, D._id, D.f1, D.f2 FROM 
value_source"
-                                        + " AS S JOIN mongo_source for 
system_time as of S.proctime AS D ON S.id = D._id")
-                        .collect()) {
-            List<String> result =
-                    CollectionUtil.iteratorToList(iterator).stream()
-                            .map(Row::toString)
-                            .sorted()
-                            .collect(Collectors.toList());
-            List<String> expected =
+                executeQuery(
+                        "SELECT S.id, S.name, D._id, D.f1, D.f2 FROM 
value_source"
+                                + " AS S JOIN mongo_source for system_time as 
of S.proctime AS D ON S.id = D._id")) {
+            List<Row> result = CollectionUtil.iteratorToList(iterator);
+
+            List<Row> expected =
                     Arrays.asList(
-                            "+I[1, Alice, 1, 2, false]",
-                            "+I[1, Alice, 1, 2, false]",
-                            "+I[2, Bob, 2, 2, false]");
+                            Row.of(1L, "Alice", 1L, "2", true),
+                            Row.of(1L, "Alice", 1L, "2", true),
+                            Row.of(2L, "Bob", 2L, "3", false));
 
             assertThat(result).hasSize(3);
             assertThat(result).isEqualTo(expected);
@@ -275,13 +241,89 @@ public class MongoDynamicTableSourceITCase {
         }
     }
 
+    @Test
+    void testFilter() {
+        tEnv.executeSql(createTestDDl(null));
+
+        // we create a VIEW here to test column remapping, i.e. would filter 
push down work if we
+        // create a view that depends on our source table
+        tEnv.executeSql(
+                "CREATE VIEW fake_table (idx, f0, f1, f2, f3, f4, f5, f6, f7, 
f8, f9, f10, f11, f12)"
+                        + " as (SELECT * from mongo_source )");
+
+        List<Row> allRows = executeQueryToList("SELECT * FROM mongo_source");
+        assertThat(allRows).hasSize(2);
+
+        Row onlyRow1 =
+                allRows.stream()
+                        .filter(row -> row.getFieldAs(0).equals(1L))
+                        .findAny()
+                        .orElseThrow(NullPointerException::new);
+
+        Row onlyRow2 =
+                allRows.stream()
+                        .filter(row -> row.getFieldAs(0).equals(2L))
+                        .findAny()
+                        .orElseThrow(NullPointerException::new);
+
+        // test the EQUALS filter
+        assertThat(executeQueryToList("SELECT * FROM fake_table WHERE 1 = 
idx"))
+                .containsExactly(onlyRow1);
+
+        // test TIMESTAMP filter
+        assertThat(
+                        executeQueryToList(
+                                "SELECT * FROM fake_table WHERE f5 = TIMESTAMP 
'2022-09-07 10:25:28.127'"))
+                .containsExactly(onlyRow1);
+
+        // test the IN operator
+        assertThat(executeQueryToList("SELECT * FROM fake_table WHERE idx IN 
(2, 3)"))
+                .containsExactly(onlyRow2);
+
+        // test the NOT IN operator
+        assertThat(
+                        executeQueryToList(
+                                "SELECT * FROM fake_table WHERE f7 NOT IN 
(CAST(1.0 AS DOUBLE), CAST(1.1 AS DOUBLE))"))
+                .containsExactly(onlyRow1);
+
+        // test mixing AND and OR operator
+        assertThat(executeQueryToList("SELECT * FROM fake_table WHERE idx <> 1 
OR f8 = 1.10"))
+                .containsExactlyInAnyOrderElementsOf(allRows);
+
+        // test mixing AND/OR with parenthesis, and the swapping the operand 
of equal expression
+        assertThat(
+                        executeQueryToList(
+                                "SELECT * FROM fake_table WHERE (f0 IS NOT 
NULL AND f2 IS TRUE) OR f8 = 102.2"))
+                .containsExactly(onlyRow1);
+
+        // test Greater than and Less than
+        assertThat(executeQueryToList("SELECT * FROM fake_table WHERE f8 > 
1.09 AND f8 < 1.11"))
+                .containsExactly(onlyRow1);
+
+        // One more test of parenthesis
+        assertThat(
+                        executeQueryToList(
+                                "SELECT * FROM fake_table WHERE f0 IS NULL AND 
(f8 >= 1.11 OR f4 <= 5)"))
+                .containsExactly(onlyRow2);
+
+        assertThat(
+                        executeQueryToList(
+                                "SELECT * FROM mongo_source WHERE _id = 2 AND 
f7 > 0.8 OR f7 < 1.1"))
+                .containsExactlyInAnyOrderElementsOf(allRows);
+
+        assertThat(
+                        executeQueryToList(
+                                "SELECT * FROM mongo_source WHERE 1 = _id AND 
f1 NOT IN ('2', '3')"))
+                .isEmpty();
+    }
+
     private static void validateCachedValues(LookupCache cache) {
         // mongo does support project push down, the cached row has been 
projected
         RowData key1 = GenericRowData.of(1L);
-        RowData value1 = GenericRowData.of(1L, StringData.fromString("2"), 
false);
+        RowData value1 = GenericRowData.of(1L, StringData.fromString("2"), 
true);
 
         RowData key2 = GenericRowData.of(2L);
-        RowData value2 = GenericRowData.of(2L, StringData.fromString("2"), 
false);
+        RowData value2 = GenericRowData.of(2L, StringData.fromString("3"), 
false);
 
         RowData key3 = GenericRowData.of(3L);
 
@@ -319,12 +361,13 @@ public class MongoDynamicTableSourceITCase {
                         "CREATE TABLE mongo_source",
                         "(",
                         "  _id BIGINT,",
+                        "  f0 STRING,",
                         "  f1 STRING,",
                         "  f2 BOOLEAN,",
                         "  f3 BINARY,",
                         "  f4 INTEGER,",
-                        "  f5 TIMESTAMP_LTZ(6),",
-                        "  f6 TIMESTAMP_LTZ(3),",
+                        "  f5 TIMESTAMP_LTZ(3),",
+                        "  f6 TIMESTAMP_LTZ(0),",
                         "  f7 DOUBLE,",
                         "  f8 DECIMAL(10, 2),",
                         "  f9 MAP<STRING, INTEGER>,",
@@ -336,29 +379,99 @@ public class MongoDynamicTableSourceITCase {
                         ")"));
     }
 
-    private static BsonDocument createTestData(long id) {
-        return new BsonDocument()
-                .append("_id", new BsonInt64(id))
-                .append("f1", new BsonString("2"))
-                .append("f2", BsonBoolean.FALSE)
-                .append("f3", new BsonBinary(new byte[] {(byte) 3}))
-                .append("f4", new BsonInt32(6))
-                // 2022-09-07T10:25:28.127Z
-                .append("f5", new BsonDateTime(1662546328127L))
-                .append("f6", new BsonTimestamp(1662546328, 0))
-                .append("f7", new BsonDouble(0.9d))
-                .append("f8", new BsonDecimal128(new Decimal128(new 
BigDecimal("1.10"))))
-                .append("f9", new BsonDocument("k", new BsonInt32(12)))
-                .append("f10", new BsonDocument("k", new BsonInt32(13)))
-                .append(
-                        "f11",
-                        new BsonArray(
-                                Arrays.asList(new BsonString("11_1"), new 
BsonString("11_2"))))
-                .append(
-                        "f12",
-                        new BsonArray(
-                                Arrays.asList(
-                                        new BsonDocument("k", new 
BsonString("12_1")),
-                                        new BsonDocument("k", new 
BsonString("12_2")))));
+    private static List<Row> expectedRows() {
+        return Arrays.asList(
+                Row.of(
+                        1L,
+                        "",
+                        "2",
+                        true,
+                        new byte[] {(byte) 3},
+                        6,
+                        Instant.ofEpochMilli(1662546328127L),
+                        Instant.ofEpochSecond(1662546328L),
+                        0.9d,
+                        new BigDecimal("1.10"),
+                        Collections.singletonMap("k", 12),
+                        Row.of(13),
+                        new String[] {"11_1", "11_2"},
+                        new Row[] {Row.of("12_1"), Row.of("12_2")}),
+                Row.of(
+                        2L,
+                        null,
+                        "3",
+                        false,
+                        new byte[] {(byte) 4},
+                        7,
+                        Instant.ofEpochMilli(1662546328128L),
+                        Instant.ofEpochSecond(1662546329L),
+                        1.0d,
+                        new BigDecimal("1.11"),
+                        Collections.singletonMap("k", 13),
+                        Row.of(14),
+                        new String[] {"11_3", "11_4"},
+                        new Row[] {Row.of("12_3"), Row.of("12_4")}));
+    }
+
+    private static List<BsonDocument> createTestData() {
+        return Arrays.asList(
+                new BsonDocument()
+                        .append("_id", new BsonInt64(1L))
+                        .append("f0", new BsonString(""))
+                        .append("f1", new BsonString("2"))
+                        .append("f2", BsonBoolean.TRUE)
+                        .append("f3", new BsonBinary(new byte[] {(byte) 3}))
+                        .append("f4", new BsonInt32(6))
+                        // 2022-09-07T10:25:28.127Z
+                        .append("f5", new BsonDateTime(1662546328127L))
+                        .append("f6", new BsonTimestamp(1662546328, 0))
+                        .append("f7", new BsonDouble(0.9d))
+                        .append("f8", new BsonDecimal128(new Decimal128(new 
BigDecimal("1.10"))))
+                        .append("f9", new BsonDocument("k", new BsonInt32(12)))
+                        .append("f10", new BsonDocument("k", new 
BsonInt32(13)))
+                        .append(
+                                "f11",
+                                new BsonArray(
+                                        Arrays.asList(
+                                                new BsonString("11_1"), new 
BsonString("11_2"))))
+                        .append(
+                                "f12",
+                                new BsonArray(
+                                        Arrays.asList(
+                                                new BsonDocument("k", new 
BsonString("12_1")),
+                                                new BsonDocument("k", new 
BsonString("12_2"))))),
+                new BsonDocument()
+                        .append("_id", new BsonInt64(2L))
+                        .append("f0", BsonNull.VALUE)
+                        .append("f1", new BsonString("3"))
+                        .append("f2", BsonBoolean.FALSE)
+                        .append("f3", new BsonBinary(new byte[] {(byte) 4}))
+                        .append("f4", new BsonInt32(7))
+                        // 2022-09-07T10:25:28.128Z
+                        .append("f5", new BsonDateTime(1662546328128L))
+                        .append("f6", new BsonTimestamp(1662546329, 0))
+                        .append("f7", new BsonDouble(1.0d))
+                        .append("f8", new BsonDecimal128(new Decimal128(new 
BigDecimal("1.11"))))
+                        .append("f9", new BsonDocument("k", new BsonInt32(13)))
+                        .append("f10", new BsonDocument("k", new 
BsonInt32(14)))
+                        .append(
+                                "f11",
+                                new BsonArray(
+                                        Arrays.asList(
+                                                new BsonString("11_3"), new 
BsonString("11_4"))))
+                        .append(
+                                "f12",
+                                new BsonArray(
+                                        Arrays.asList(
+                                                new BsonDocument("k", new 
BsonString("12_3")),
+                                                new BsonDocument("k", new 
BsonString("12_4"))))));
+    }
+
+    private static List<Row> executeQueryToList(String sql) {
+        return CollectionUtil.iteratorToList(executeQuery(sql));
+    }
+
+    private static CloseableIterator<Row> executeQuery(String sql) {
+        return tEnv.executeSql(sql).collect();
     }
 }
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
new file mode 100644
index 0000000..1dbb8e9
--- /dev/null
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoFilterPushDownVisitorTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.expressions.RexNodeExpression;
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.mongodb.client.model.Filters;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonNull;
+import org.bson.BsonString;
+import org.bson.conversions.Bson;
+import org.bson.types.Decimal128;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import scala.Option;
+
+import static 
org.apache.flink.connector.mongodb.table.MongoDynamicTableSource.parseFilter;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link MongoFilterPushDownVisitor}. */
+class MongoFilterPushDownVisitorTest {
+
+    private static final String INPUT_TABLE = "mongo_source";
+
+    private static final BsonDocument EMPTY_FILTER = 
Filters.empty().toBsonDocument();
+
+    private static StreamExecutionEnvironment env;
+    private static StreamTableEnvironment tEnv;
+
+    private final ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+    @BeforeEach
+    void before() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+        tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
+
+        // Create table in Flink, this can be reused across test cases
+        tEnv.executeSql(
+                "CREATE TABLE "
+                        + INPUT_TABLE
+                        + "("
+                        + "id INTEGER,"
+                        + "description VARCHAR(200),"
+                        + "boolean_col BOOLEAN,"
+                        + "timestamp_col TIMESTAMP(0),"
+                        + "timestamp3_col TIMESTAMP(3),"
+                        + "double_col DOUBLE,"
+                        + "decimal_col DECIMAL(10, 4)"
+                        + ") WITH ("
+                        + "  'connector'='mongodb',"
+                        + "  'uri'='mongodb://127.0.0.1:27017',"
+                        + "  'database'='test_db',"
+                        + "  'collection'='test_coll'"
+                        + ")");
+    }
+
+    @Test
+    void testSimpleExpressionPrimitiveType() {
+        ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + 
INPUT_TABLE).getResolvedSchema();
+        Arrays.asList(
+                        new Object[] {"id = 6", Filters.eq("id", new 
BsonInt32(6))},
+                        new Object[] {"id >= 6", Filters.gte("id", new 
BsonInt32(6))},
+                        new Object[] {"id > 6", Filters.gt("id", new 
BsonInt32(6))},
+                        new Object[] {"id < 6", Filters.lt("id", new 
BsonInt32(6))},
+                        new Object[] {"id <= 5", Filters.lte("id", 5)},
+                        new Object[] {
+                            "description = 'Halo'",
+                            Filters.eq("description", new BsonString("Halo"))
+                        },
+                        new Object[] {
+                            "boolean_col = true", Filters.eq("boolean_col", 
new BsonBoolean(true))
+                        },
+                        new Object[] {
+                            "boolean_col = false", Filters.eq("boolean_col", 
new BsonBoolean(false))
+                        },
+                        new Object[] {
+                            "double_col > 0.5",
+                            Filters.gt(
+                                    "double_col",
+                                    new BsonDecimal128(new Decimal128(new 
BigDecimal("0.5"))))
+                        },
+                        new Object[] {
+                            "decimal_col <= -0.3",
+                            Filters.lte(
+                                    "decimal_col",
+                                    new BsonDecimal128(new Decimal128(new 
BigDecimal("-0.3"))))
+                        })
+                .forEach(
+                        inputs ->
+                                assertGeneratedFilter(
+                                        (String) inputs[0],
+                                        schema,
+                                        ((Bson) inputs[1]).toBsonDocument()));
+    }
+
+    @Test
+    void testComplexExpressionDatetime() {
+        ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + 
INPUT_TABLE).getResolvedSchema();
+        assertGeneratedFilter(
+                "id = 6 AND timestamp_col = TIMESTAMP '2022-01-01 07:00:01'",
+                schema,
+                Filters.and(
+                                Filters.eq("id", new BsonInt32(6)),
+                                Filters.eq(
+                                        "timestamp_col",
+                                        new BsonDateTime(
+                                                Timestamp.valueOf("2022-01-01 
07:00:01")
+                                                        .getTime())))
+                        .toBsonDocument());
+
+        assertGeneratedFilter(
+                "timestamp3_col = TIMESTAMP '2022-01-01 07:00:01.333' OR 
description = 'Halo'",
+                schema,
+                Filters.or(
+                                Filters.eq(
+                                        "timestamp3_col",
+                                        new BsonDateTime(
+                                                Timestamp.valueOf("2022-01-01 
07:00:01.333")
+                                                        .getTime())),
+                                Filters.eq("description", new 
BsonString("Halo")))
+                        .toBsonDocument());
+    }
+
+    @Test
+    void testExpressionWithNull() {
+        ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + 
INPUT_TABLE).getResolvedSchema();
+
+        assertGeneratedFilter(
+                "id = NULL AND decimal_col <= 0.6",
+                schema,
+                Filters.and(
+                                Filters.eq("id", BsonNull.VALUE),
+                                Filters.lte(
+                                        "decimal_col",
+                                        new BsonDecimal128(new Decimal128(new 
BigDecimal("0.6")))))
+                        .toBsonDocument());
+
+        assertGeneratedFilter(
+                "id = 6 OR description = NULL",
+                schema,
+                Filters.or(
+                                Filters.eq("id", new BsonInt32(6)),
+                                Filters.eq("description", BsonNull.VALUE))
+                        .toBsonDocument());
+    }
+
+    @Test
+    void testExpressionIsNull() {
+        ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + 
INPUT_TABLE).getResolvedSchema();
+
+        assertGeneratedFilter(
+                "id IS NULL AND decimal_col <= 0.6",
+                schema,
+                Filters.and(
+                                Filters.eq("id", BsonNull.VALUE),
+                                Filters.lte(
+                                        "decimal_col",
+                                        new BsonDecimal128(new Decimal128(new 
BigDecimal("0.6")))))
+                        .toBsonDocument());
+
+        assertGeneratedFilter(
+                "id = 6 OR description IS NOT NULL",
+                schema,
+                Filters.or(
+                                Filters.eq("id", new BsonInt32(6)),
+                                Filters.ne("description", BsonNull.VALUE))
+                        .toBsonDocument());
+    }
+
+    @Test
+    void testExpressionCannotBePushedDown() {
+        ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + 
INPUT_TABLE).getResolvedSchema();
+
+        // unsupported operators
+        assertGeneratedFilter("description LIKE '_bcd%'", schema, 
EMPTY_FILTER);
+
+        // nested complex expressions
+        assertGeneratedFilter("double_col = decimal_col", schema, 
EMPTY_FILTER);
+        assertGeneratedFilter("boolean_col = (decimal_col > 2.0)", schema, 
EMPTY_FILTER);
+
+        // partial push down
+        assertGeneratedFilter(
+                "id IS NULL AND description LIKE '_bcd%'",
+                schema, Filters.eq("id", BsonNull.VALUE).toBsonDocument());
+
+        // sub filter cannot be pushed down
+        assertGeneratedFilter("id IS NOT NULL OR double_col = decimal_col", 
schema, EMPTY_FILTER);
+    }
+
+    private void assertGeneratedFilter(
+            String inputExpr, ResolvedSchema schema, BsonDocument expected) {
+        List<ResolvedExpression> filters = 
resolveSQLFilterToExpression(inputExpr, schema);
+
+        List<Bson> mongoFilters = new ArrayList<>();
+        for (ResolvedExpression filter : filters) {
+            BsonDocument simpleFilter = parseFilter(filter);
+            if (!simpleFilter.isEmpty()) {
+                mongoFilters.add(simpleFilter);
+            }
+        }
+
+        BsonDocument actual = EMPTY_FILTER;
+        if (!mongoFilters.isEmpty()) {
+            Bson mergedFilter =
+                    mongoFilters.size() == 1 ? mongoFilters.get(0) : 
Filters.and(mongoFilters);
+            actual = mergedFilter.toBsonDocument();
+        }
+
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    /**
+     * Resolve a SQL filter expression against a Schema, this method makes use 
of some
+     * implementation details of Flink.
+     */
+    private List<ResolvedExpression> resolveSQLFilterToExpression(
+            String sqlExp, ResolvedSchema schema) {
+        StreamTableEnvironmentImpl tbImpl = (StreamTableEnvironmentImpl) tEnv;
+
+        FlinkContext ctx = ((PlannerBase) 
tbImpl.getPlanner()).getFlinkContext();
+        CatalogManager catMan = tbImpl.getCatalogManager();
+        FunctionCatalog funCat = ctx.getFunctionCatalog();
+        RowType sourceType = (RowType) 
schema.toSourceRowDataType().getLogicalType();
+
+        FlinkTypeFactory typeFactory = new FlinkTypeFactory(classLoader, 
FlinkTypeSystem.INSTANCE);
+        RexBuilder rexBuilder = new RexBuilder(typeFactory);
+        RexNodeToExpressionConverter converter =
+                new RexNodeToExpressionConverter(
+                        rexBuilder,
+                        sourceType.getFieldNames().toArray(new String[0]),
+                        funCat,
+                        catMan,
+                        
TimeZone.getTimeZone(tEnv.getConfig().getLocalTimeZone()));
+
+        RexNodeExpression rexExp =
+                (RexNodeExpression) 
tbImpl.getParser().parseSqlExpression(sqlExp, sourceType, null);
+
+        RexNode cnf = FlinkRexUtil.toCnf(rexBuilder, -1, rexExp.getRexNode());
+        // converts the cnf condition to a list of AND conditions
+        List<RexNode> conjunctions = RelOptUtil.conjunctions(cnf);
+
+        List<Expression> resolvedExps =
+                conjunctions.stream()
+                        .map(rex -> rex.accept(converter))
+                        .filter(Option::isDefined)
+                        .map(Option::get)
+                        .collect(Collectors.toList());
+
+        ExpressionResolver resolver =
+                ExpressionResolver.resolverFor(
+                                tEnv.getConfig(),
+                                classLoader,
+                                name -> Optional.empty(),
+                                funCat.asLookup(
+                                        str -> {
+                                            throw new TableException(
+                                                    "We should not need to 
lookup any expressions at this point");
+                                        }),
+                                catMan.getDataTypeFactory(),
+                                (sqlExpression, inputRowType, outputType) -> {
+                                    throw new TableException(
+                                            "SQL expression parsing is not 
supported at this location.");
+                                })
+                        .build();
+
+        return resolver.resolve(resolvedExps);
+    }
+}
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
new file mode 100644
index 0000000..133cbe7
--- /dev/null
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.rules.TestName;
+
+import java.time.ZoneId;
+
+/** Plan tests for Mongo connector, for example, testing projection push down. 
*/
+public class MongoTablePlanTest extends TableTestBase {
+
+    private final StreamTableTestUtil util = 
streamTestUtil(TableConfig.getDefault());
+
+    private TestInfo testInfo;
+
+    @BeforeEach
+    public void setup(TestInfo testInfo) {
+        this.testInfo = testInfo;
+        TableEnvironment tEnv = util.tableEnv();
+        tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
+        tEnv.executeSql(
+                "CREATE TABLE mongo ("
+                        + "id BIGINT,"
+                        + "description VARCHAR(200),"
+                        + "boolean_col BOOLEAN,"
+                        + "timestamp_col TIMESTAMP_LTZ(0),"
+                        + "timestamp3_col TIMESTAMP_LTZ(3),"
+                        + "int_col INTEGER,"
+                        + "double_col DOUBLE,"
+                        + "decimal_col DECIMAL(10, 4)"
+                        + ") WITH ("
+                        + "  'connector'='mongodb',"
+                        + "  'uri'='mongodb://127.0.0.1:27017',"
+                        + "  'database'='test_db',"
+                        + "  'collection'='test_coll'"
+                        + ")");
+    }
+
+    @Test
+    public void testFilterPushdown() {
+        util.verifyExecPlan(
+                "SELECT id, timestamp3_col, int_col FROM mongo WHERE id = 
900001 AND timestamp3_col <> TIMESTAMP '2022-09-07 10:25:28.127' OR double_col 
>= -1000.23");
+    }
+
+    @Test
+    public void testFilterPartialPushdown() {
+        util.verifyExecPlan(
+                "SELECT id, timestamp3_col, int_col FROM mongo WHERE id = 
900001 AND boolean_col = (decimal_col > 2.0)");
+    }
+
+    @Test
+    public void testFilterCannotPushdown() {
+        util.verifyExecPlan(
+                "SELECT id, timestamp3_col, int_col FROM mongo WHERE id IS NOT 
NULL OR double_col = decimal_col");
+    }
+
+    // A workaround to get the test method name for flink versions not 
completely migrated to JUnit5
+    public TestName name() {
+        return new TestName() {
+            @Override
+            public String getMethodName() {
+                return testInfo.getTestMethod().get().getName();
+            }
+        };
+    }
+}
diff --git 
a/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
 
b/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
new file mode 100644
index 0000000..b2c57e4
--- /dev/null
+++ 
b/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testFilterPushdown">
+    <Resource name="sql">
+      <![CDATA[SELECT id, timestamp3_col, int_col FROM mongo WHERE id = 900001 
AND timestamp3_col <> TIMESTAMP '2022-09-07 10:25:28.127' OR double_col >= 
-1000.23]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], timestamp3_col=[$4], int_col=[$5])
++- LogicalFilter(condition=[OR(AND(=($0, 900001), <>($4, CAST(2022-09-07 
10:25:28.127:TIMESTAMP(3)):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL)), >=($6, 
-1000.23:DECIMAL(6, 2)))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, mongo]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, mongo, 
filter=[and(OR(=(id, 900001:BIGINT), >=(double_col, -1000.23:DECIMAL(6, 2))), 
OR(<>(timestamp3_col, 2022-09-07 
10:25:28.127:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), >=(double_col, 
-1000.23:DECIMAL(6, 2))))], project=[id, timestamp3_col, int_col]]], 
fields=[id, timestamp3_col, int_col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterPartialPushdown">
+    <Resource name="sql">
+      <![CDATA[SELECT id, timestamp3_col, int_col FROM mongo WHERE id = 900001 
AND boolean_col = (decimal_col > 2.0)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], timestamp3_col=[$4], int_col=[$5])
++- LogicalFilter(condition=[AND(=($0, 900001), =($2, >($7, 2.0:DECIMAL(2, 
1))))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, mongo]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[CAST(900001 AS BIGINT) AS id, timestamp3_col, int_col], 
where=[(boolean_col = (decimal_col > 2.0))])
++- TableSourceScan(table=[[default_catalog, default_database, mongo, 
filter=[=(id, 900001:BIGINT)], project=[boolean_col, timestamp3_col, int_col, 
decimal_col]]], fields=[boolean_col, timestamp3_col, int_col, decimal_col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterCannotPushdown">
+    <Resource name="sql">
+      <![CDATA[SELECT id, timestamp3_col, int_col FROM mongo WHERE id IS NOT 
NULL OR double_col = decimal_col]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], timestamp3_col=[$4], int_col=[$5])
++- LogicalFilter(condition=[OR(IS NOT NULL($0), =($6, CAST($7):DOUBLE))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, mongo]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[id, timestamp3_col, int_col], where=[(id IS NOT NULL OR 
(double_col = CAST(decimal_col AS DOUBLE)))])
++- TableSourceScan(table=[[default_catalog, default_database, mongo, 
filter=[], project=[id, timestamp3_col, int_col, double_col, decimal_col]]], 
fields=[id, timestamp3_col, int_col, double_col, decimal_col])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/pom.xml b/pom.xml
index 7d2414b..9b470ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,8 @@ under the License.
                <mongodb.version>4.7.2</mongodb.version>
 
                <flink.version>1.17.0</flink.version>
-
+               <scala.binary.version>2.12</scala.binary.version>
+               <scala-library.version>2.12.7</scala-library.version>
                <junit5.version>5.8.1</junit5.version>
                <assertj.version>3.21.0</assertj.version>
                <testcontainers.version>1.17.2</testcontainers.version>
@@ -311,6 +312,13 @@ under the License.
                                <version>2.1</version>
                        </dependency>
 
+                       <!-- For dependency convergence -->
+                       <dependency>
+                               <groupId>org.scala-lang</groupId>
+                               <artifactId>scala-library</artifactId>
+                               <version>${scala-library.version}</version>
+                       </dependency>
+
                        <dependency>
                                <groupId>org.testcontainers</groupId>
                                <artifactId>testcontainers-bom</artifactId>

Reply via email to