This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit c843f966a3b15ded41a72c2baf16b94e02b1839b
Author: Alibaba-HZY <[email protected]>
AuthorDate: Mon Mar 3 11:36:23 2025 +0800

    [common] Introduce Predicate interface and basic predicate expressions 
(#515)
---
 .../main/java/com/alibaba/fluss/predicate/And.java |  67 +++
 .../com/alibaba/fluss/predicate/CompareUtils.java  |  60 +++
 .../alibaba/fluss/predicate/CompoundPredicate.java | 114 ++++++
 .../java/com/alibaba/fluss/predicate/Contains.java |  62 +++
 .../java/com/alibaba/fluss/predicate/EndsWith.java |  65 +++
 .../java/com/alibaba/fluss/predicate/Equal.java    |  58 +++
 .../java/com/alibaba/fluss/predicate/FieldRef.java |  79 ++++
 .../alibaba/fluss/predicate/FunctionVisitor.java   |  83 ++++
 .../alibaba/fluss/predicate/GreaterOrEqual.java    |  58 +++
 .../com/alibaba/fluss/predicate/GreaterThan.java   |  58 +++
 .../main/java/com/alibaba/fluss/predicate/In.java  |  83 ++++
 .../com/alibaba/fluss/predicate/IsNotNull.java     |  55 +++
 .../java/com/alibaba/fluss/predicate/IsNull.java   |  55 +++
 .../com/alibaba/fluss/predicate/LeafFunction.java  |  65 +++
 .../com/alibaba/fluss/predicate/LeafPredicate.java | 179 ++++++++
 .../alibaba/fluss/predicate/LeafUnaryFunction.java |  53 +++
 .../com/alibaba/fluss/predicate/LessOrEqual.java   |  58 +++
 .../java/com/alibaba/fluss/predicate/LessThan.java |  58 +++
 .../java/com/alibaba/fluss/predicate/NotEqual.java |  58 +++
 .../java/com/alibaba/fluss/predicate/NotIn.java    |  82 ++++
 .../predicate/NullFalseLeafBinaryFunction.java     |  61 +++
 .../main/java/com/alibaba/fluss/predicate/Or.java  |  67 +++
 .../fluss/predicate/PartitionPredicateVisitor.java |  51 +++
 .../com/alibaba/fluss/predicate/Predicate.java     |  50 +++
 .../alibaba/fluss/predicate/PredicateBuilder.java  | 454 +++++++++++++++++++++
 .../alibaba/fluss/predicate/PredicateVisitor.java  |  30 ++
 .../com/alibaba/fluss/predicate/StartsWith.java    |  69 ++++
 .../fluss/predicate/UnsupportedExpression.java     |  29 ++
 .../main/java/com/alibaba/fluss/types/RowType.java |  10 +
 .../com/alibaba/fluss/utils/BinaryStringUtils.java |   8 +
 .../com/alibaba/fluss/utils/DateTimeUtils.java     |  10 +
 .../fluss/predicate/PredicateBuilderTest.java      |  85 ++++
 .../com/alibaba/fluss/predicate/PredicateTest.java | 424 +++++++++++++++++++
 fluss-test-coverage/pom.xml                        |   2 +
 34 files changed, 2800 insertions(+)

diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/And.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/And.java
new file mode 100644
index 000000000..204bb7e3d
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/And.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link CompoundPredicate.Function} to eval and. */
+public class And extends CompoundPredicate.Function {
+
+    private static final long serialVersionUID = -2977938814804928712L;
+
+    public static final And INSTANCE = new And();
+
+    private And() {}
+
+    @Override
+    public boolean test(InternalRow row, List<Predicate> children) {
+        for (Predicate child : children) {
+            if (!child.test(row)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public Optional<Predicate> negate(List<Predicate> children) {
+        List<Predicate> negatedChildren = new ArrayList<>();
+        for (Predicate child : children) {
+            Optional<Predicate> negatedChild = child.negate();
+            if (negatedChild.isPresent()) {
+                negatedChildren.add(negatedChild.get());
+            } else {
+                return Optional.empty();
+            }
+        }
+        return Optional.of(new CompoundPredicate(Or.INSTANCE, 
negatedChildren));
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, List<T> children) {
+        return visitor.visitAnd(children);
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/CompareUtils.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/CompareUtils.java
new file mode 100644
index 000000000..50380abf8
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/CompareUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.types.DataType;
+
+import static java.lang.Math.min;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Utils for comparator. */
+public class CompareUtils {
+    private CompareUtils() {}
+
+    public static int compareLiteral(DataType type, Object v1, Object v2) {
+        if (v1 instanceof Comparable) {
+            // because BinaryString can not serialize so v1 or v2 may be 
BinaryString convert to
+            // String for compare
+            if (v1 instanceof BinaryString) {
+                v1 = ((BinaryString) v1).toString();
+            }
+            if (v2 instanceof BinaryString) {
+                v2 = ((BinaryString) v2).toString();
+            }
+            return ((Comparable<Object>) v1).compareTo(v2);
+        } else if (v1 instanceof byte[]) {
+            return compare((byte[]) v1, (byte[]) v2);
+        } else {
+            throw new RuntimeException("Unsupported type: " + type);
+        }
+    }
+
+    private static int compare(byte[] first, byte[] second) {
+        for (int x = 0; x < min(first.length, second.length); x++) {
+            int cmp = first[x] - second[x];
+            if (cmp != 0) {
+                return cmp;
+            }
+        }
+        return first.length - second.length;
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/CompoundPredicate.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/CompoundPredicate.java
new file mode 100644
index 000000000..449598f2f
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/CompoundPredicate.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * Non-leaf node in a {@link Predicate} tree. Its evaluation result depends on 
the results of its
+ * children.
+ */
+public class CompoundPredicate implements Predicate {
+
+    private final Function function;
+    private final List<Predicate> children;
+
+    public CompoundPredicate(Function function, List<Predicate> children) {
+        this.function = function;
+        this.children = children;
+    }
+
+    public Function function() {
+        return function;
+    }
+
+    public List<Predicate> children() {
+        return children;
+    }
+
+    @Override
+    public boolean test(InternalRow row) {
+        return function.test(row, children);
+    }
+
+    @Override
+    public Optional<Predicate> negate() {
+        return function.negate(children);
+    }
+
+    @Override
+    public <T> T visit(PredicateVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof CompoundPredicate)) {
+            return false;
+        }
+        CompoundPredicate that = (CompoundPredicate) o;
+        return Objects.equals(function, that.function) && 
Objects.equals(children, that.children);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(function, children);
+    }
+
+    @Override
+    public String toString() {
+        return function + "(" + children + ")";
+    }
+
+    /** Evaluate the predicate result based on multiple {@link Predicate}s. */
+    public abstract static class Function implements Serializable {
+
+        public abstract boolean test(InternalRow row, List<Predicate> 
children);
+
+        public abstract Optional<Predicate> negate(List<Predicate> children);
+
+        public abstract <T> T visit(FunctionVisitor<T> visitor, List<T> 
children);
+
+        @Override
+        public int hashCode() {
+            return this.getClass().getName().hashCode();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            return o != null && getClass() == o.getClass();
+        }
+
+        @Override
+        public String toString() {
+            return getClass().getSimpleName();
+        }
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/Contains.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Contains.java
new file mode 100644
index 000000000..cfdaf0a06
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Contains.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link NullFalseLeafBinaryFunction} to evaluate {@code filter like 
'%abc%'}. */
+public class Contains extends NullFalseLeafBinaryFunction {
+
+    public static final Contains INSTANCE = new Contains();
+
+    private Contains() {}
+
+    @Override
+    public boolean test(DataType type, Object field, Object patternLiteral) {
+        String fieldString = field.toString();
+        return fieldString.contains((String) patternLiteral);
+    }
+
+    @Override
+    public boolean test(
+            DataType type,
+            long rowCount,
+            Object min,
+            Object max,
+            Long nullCount,
+            Object patternLiteral) {
+        return true;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.empty();
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        return visitor.visitContains(fieldRef, literals.get(0));
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/EndsWith.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/EndsWith.java
new file mode 100644
index 000000000..a6af4c2e1
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/EndsWith.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * A {@link NullFalseLeafBinaryFunction} to evaluate {@code filter like '%abc' 
or filter like
+ * '_abc'}.
+ */
+public class EndsWith extends NullFalseLeafBinaryFunction {
+
+    public static final EndsWith INSTANCE = new EndsWith();
+
+    private EndsWith() {}
+
+    @Override
+    public boolean test(DataType type, Object field, Object patternLiteral) {
+        String fieldString = field.toString();
+        return fieldString.endsWith((String) patternLiteral);
+    }
+
+    @Override
+    public boolean test(
+            DataType type,
+            long rowCount,
+            Object min,
+            Object max,
+            Long nullCount,
+            Object patternLiteral) {
+        return true;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.empty();
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        return visitor.visitEndsWith(fieldRef, literals.get(0));
+    }
+}
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/Equal.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Equal.java
new file mode 100644
index 000000000..26b181749
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Equal.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link NullFalseLeafBinaryFunction} to eval equal. */
+public class Equal extends NullFalseLeafBinaryFunction {
+
+    public static final Equal INSTANCE = new Equal();
+
+    private Equal() {}
+
+    @Override
+    public boolean test(DataType type, Object field, Object literal) {
+        return compareLiteral(type, literal, field) == 0;
+    }
+
+    @Override
+    public boolean test(
+            DataType type, long rowCount, Object min, Object max, Long 
nullCount, Object literal) {
+        return compareLiteral(type, literal, min) >= 0 && compareLiteral(type, 
literal, max) <= 0;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(NotEqual.INSTANCE);
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        return visitor.visitEqual(fieldRef, literals.get(0));
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/FieldRef.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/FieldRef.java
new file mode 100644
index 000000000..d98dc92ba
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/FieldRef.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A reference to a field in an input. */
+public class FieldRef implements Serializable {
+
+    private static final long serialVersionUID = 4982103776651292199L;
+
+    private final int index;
+    private final String name;
+    private final DataType type;
+
+    public FieldRef(int index, String name, DataType type) {
+        this.index = index;
+        this.name = name;
+        this.type = type;
+    }
+
+    public int index() {
+        return index;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public DataType type() {
+        return type;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        FieldRef fieldRef = (FieldRef) o;
+        return index == fieldRef.index
+                && Objects.equals(name, fieldRef.name)
+                && Objects.equals(type, fieldRef.type);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(index, name, type);
+    }
+
+    @Override
+    public String toString() {
+        return "FieldRef{" + "index=" + index + ", name='" + name + '\'' + ", 
type=" + type + '}';
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/FunctionVisitor.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/FunctionVisitor.java
new file mode 100644
index 000000000..806f2de37
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/FunctionVisitor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link PredicateVisitor} to visit functions. */
+public interface FunctionVisitor<T> extends PredicateVisitor<T> {
+
+    @Override
+    default T visit(LeafPredicate predicate) {
+        return predicate.function().visit(this, predicate.fieldRef(), 
predicate.literals());
+    }
+
+    @Override
+    default T visit(CompoundPredicate predicate) {
+        return predicate
+                .function()
+                .visit(
+                        this,
+                        predicate.children().stream()
+                                .map(p -> p.visit(this))
+                                .collect(Collectors.toList()));
+    }
+
+    // ----------------- Unary functions ------------------------
+
+    T visitIsNotNull(FieldRef fieldRef);
+
+    T visitIsNull(FieldRef fieldRef);
+
+    // ----------------- Binary functions ------------------------
+
+    T visitStartsWith(FieldRef fieldRef, Object literal);
+
+    T visitEndsWith(FieldRef fieldRef, Object literal);
+
+    T visitContains(FieldRef fieldRef, Object literal);
+
+    T visitLessThan(FieldRef fieldRef, Object literal);
+
+    T visitGreaterOrEqual(FieldRef fieldRef, Object literal);
+
+    T visitNotEqual(FieldRef fieldRef, Object literal);
+
+    T visitLessOrEqual(FieldRef fieldRef, Object literal);
+
+    T visitEqual(FieldRef fieldRef, Object literal);
+
+    T visitGreaterThan(FieldRef fieldRef, Object literal);
+
+    // ----------------- Other functions ------------------------
+
+    T visitIn(FieldRef fieldRef, List<Object> literals);
+
+    T visitNotIn(FieldRef fieldRef, List<Object> literals);
+
+    // ----------------- Compound functions ------------------------
+
+    T visitAnd(List<T> children);
+
+    T visitOr(List<T> children);
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterOrEqual.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterOrEqual.java
new file mode 100644
index 000000000..0a915355f
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterOrEqual.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link NullFalseLeafBinaryFunction} to eval greater or equal. */
+public class GreaterOrEqual extends NullFalseLeafBinaryFunction {
+
+    public static final GreaterOrEqual INSTANCE = new GreaterOrEqual();
+
+    private GreaterOrEqual() {}
+
+    @Override
+    public boolean test(DataType type, Object field, Object literal) {
+        return compareLiteral(type, literal, field) <= 0;
+    }
+
+    @Override
+    public boolean test(
+            DataType type, long rowCount, Object min, Object max, Long 
nullCount, Object literal) {
+        return compareLiteral(type, literal, max) <= 0;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(LessThan.INSTANCE);
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        return visitor.visitGreaterOrEqual(fieldRef, literals.get(0));
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterThan.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterThan.java
new file mode 100644
index 000000000..f92c84c9b
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/GreaterThan.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link LeafFunction} to eval greater. */
+public class GreaterThan extends NullFalseLeafBinaryFunction {
+
+    public static final GreaterThan INSTANCE = new GreaterThan();
+
+    private GreaterThan() {}
+
+    @Override
+    public boolean test(DataType type, Object field, Object literal) {
+        return compareLiteral(type, literal, field) < 0;
+    }
+
+    @Override
+    public boolean test(
+            DataType type, long rowCount, Object min, Object max, Long 
nullCount, Object literal) {
+        return compareLiteral(type, literal, max) < 0;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(LessOrEqual.INSTANCE);
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        return visitor.visitGreaterThan(fieldRef, literals.get(0));
+    }
+}
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/In.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/In.java
new file mode 100644
index 000000000..672a1c797
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/In.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link LeafFunction} to eval in. */
+public class In extends LeafFunction {
+
+    private static final long serialVersionUID = -9115697441080586485L;
+
+    public static final In INSTANCE = new In();
+
+    private In() {}
+
+    @Override
+    public boolean test(DataType type, Object field, List<Object> literals) {
+        if (field == null) {
+            return false;
+        }
+        for (Object literal : literals) {
+            if (literal != null && compareLiteral(type, literal, field) == 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean test(
+            DataType type,
+            long rowCount,
+            Object min,
+            Object max,
+            Long nullCount,
+            List<Object> literals) {
+        if (nullCount != null && rowCount == nullCount) {
+            return false;
+        }
+        for (Object literal : literals) {
+            if (literal != null
+                    && compareLiteral(type, literal, min) >= 0
+                    && compareLiteral(type, literal, max) <= 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(NotIn.INSTANCE);
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        return visitor.visitIn(fieldRef, literals);
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNotNull.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNotNull.java
new file mode 100644
index 000000000..6249306b5
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNotNull.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link NullFalseLeafBinaryFunction} to eval is not null. */
+public class IsNotNull extends LeafUnaryFunction {
+
+    public static final IsNotNull INSTANCE = new IsNotNull();
+
+    private IsNotNull() {}
+
+    @Override
+    public boolean test(DataType type, Object field) {
+        return field != null;
+    }
+
+    @Override
+    public boolean test(DataType type, long rowCount, Object min, Object max, 
Long nullCount) {
+        return nullCount == null || nullCount < rowCount;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(IsNull.INSTANCE);
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        return visitor.visitIsNotNull(fieldRef);
+    }
+}
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNull.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNull.java
new file mode 100644
index 000000000..d97c8cb43
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/IsNull.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link NullFalseLeafBinaryFunction} to eval is null. */
+public class IsNull extends LeafUnaryFunction {
+
+    public static final IsNull INSTANCE = new IsNull();
+
+    private IsNull() {}
+
+    @Override
+    public boolean test(DataType type, Object field) {
+        return field == null;
+    }
+
+    @Override
+    public boolean test(DataType type, long rowCount, Object min, Object max, 
Long nullCount) {
+        return nullCount == null || nullCount > 0;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(IsNotNull.INSTANCE);
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        return visitor.visitIsNull(fieldRef);
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafFunction.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafFunction.java
new file mode 100644
index 000000000..a897c902e
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Function to test a field with literals. */
+public abstract class LeafFunction implements Serializable {
+
+    public abstract boolean test(DataType type, Object field, List<Object> 
literals);
+
+    public abstract boolean test(
+            DataType type,
+            long rowCount,
+            Object min,
+            Object max,
+            Long nullCount,
+            List<Object> literals);
+
+    public abstract Optional<LeafFunction> negate();
+
+    @Override
+    public int hashCode() {
+        return this.getClass().getName().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        return o != null && getClass() == o.getClass();
+    }
+
+    public abstract <T> T visit(
+            FunctionVisitor<T> visitor, FieldRef fieldRef, List<Object> 
literals);
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName();
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafPredicate.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafPredicate.java
new file mode 100644
index 000000000..343e6c0f3
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafPredicate.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.TimestampType;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Leaf node of a {@link Predicate} tree. Compares a field in the row with 
literals. */
+public class LeafPredicate implements Predicate {
+
+    private static final long serialVersionUID = -9033842253303772188L;
+
+    private final LeafFunction function;
+    private final DataType type;
+    private final int fieldIndex;
+    private final String fieldName;
+
+    private List<Object> literals;
+
+    public LeafPredicate(
+            LeafFunction function,
+            DataType type,
+            int fieldIndex,
+            String fieldName,
+            List<Object> literals) {
+        this.function = function;
+        this.type = type;
+        this.fieldIndex = fieldIndex;
+        this.fieldName = fieldName;
+        this.literals = literals;
+    }
+
+    public LeafFunction function() {
+        return function;
+    }
+
+    public DataType type() {
+        return type;
+    }
+
+    public int index() {
+        return fieldIndex;
+    }
+
+    public String fieldName() {
+        return fieldName;
+    }
+
+    public FieldRef fieldRef() {
+        return new FieldRef(fieldIndex, fieldName, type);
+    }
+
+    public List<Object> literals() {
+        return literals;
+    }
+
+    public LeafPredicate copyWithNewIndex(int fieldIndex) {
+        return new LeafPredicate(function, type, fieldIndex, fieldName, 
literals);
+    }
+
+    @Override
+    public boolean test(InternalRow row) {
+        return function.test(type, get(row, fieldIndex, type), literals);
+    }
+
+    @Override
+    public Optional<Predicate> negate() {
+        return function.negate()
+                .map(negate -> new LeafPredicate(negate, type, fieldIndex, 
fieldName, literals));
+    }
+
+    @Override
+    public <T> T visit(PredicateVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        LeafPredicate that = (LeafPredicate) o;
+        return fieldIndex == that.fieldIndex
+                && Objects.equals(fieldName, that.fieldName)
+                && Objects.equals(function, that.function)
+                && Objects.equals(type, that.type)
+                && Objects.equals(literals, that.literals);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(function, type, fieldIndex, fieldName, literals);
+    }
+
+    @Override
+    public String toString() {
+        String literalsStr;
+        if (literals == null || literals.isEmpty()) {
+            literalsStr = "";
+        } else if (literals.size() == 1) {
+            literalsStr = Objects.toString(literals.get(0));
+        } else {
+            literalsStr = literals.toString();
+        }
+        return literalsStr.isEmpty()
+                ? function + "(" + fieldName + ")"
+                : function + "(" + fieldName + ", " + literalsStr + ")";
+    }
+
+    public static Object get(InternalRow internalRow, int pos, DataType 
fieldType) {
+        if (internalRow.isNullAt(pos)) {
+            return null;
+        }
+        switch (fieldType.getTypeRoot()) {
+            case BOOLEAN:
+                return internalRow.getBoolean(pos);
+            case TINYINT:
+                return internalRow.getByte(pos);
+            case SMALLINT:
+                return internalRow.getShort(pos);
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+                return internalRow.getInt(pos);
+            case BIGINT:
+                return internalRow.getLong(pos);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                TimestampType timestampType = (TimestampType) fieldType;
+                return internalRow.getTimestampNtz(pos, 
timestampType.getPrecision());
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                LocalZonedTimestampType lzTs = (LocalZonedTimestampType) 
fieldType;
+                return internalRow.getTimestampNtz(pos, lzTs.getPrecision());
+            case FLOAT:
+                return internalRow.getFloat(pos);
+            case DOUBLE:
+                return internalRow.getDouble(pos);
+            case CHAR:
+            case STRING:
+                return internalRow.getString(pos);
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) fieldType;
+                return internalRow.getDecimal(
+                        pos, decimalType.getPrecision(), 
decimalType.getScale());
+            case BINARY:
+                return internalRow.getBytes(pos);
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + 
fieldType);
+        }
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafUnaryFunction.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafUnaryFunction.java
new file mode 100644
index 000000000..b10fe40ff
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LeafUnaryFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Function to test a field. */
+public abstract class LeafUnaryFunction extends LeafFunction {
+
+    private static final long serialVersionUID = -155104972966998013L;
+
+    public abstract boolean test(DataType type, Object value);
+
+    public abstract boolean test(
+            DataType type, long rowCount, Object min, Object max, Long 
nullCount);
+
+    @Override
+    public boolean test(DataType type, Object value, List<Object> literals) {
+        return test(type, value);
+    }
+
+    @Override
+    public boolean test(
+            DataType type,
+            long rowCount,
+            Object min,
+            Object max,
+            Long nullCount,
+            List<Object> literals) {
+        return test(type, rowCount, min, max, nullCount);
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/LessOrEqual.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LessOrEqual.java
new file mode 100644
index 000000000..45dd3fda6
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LessOrEqual.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link NullFalseLeafBinaryFunction} to eval less or equal. */
+public class LessOrEqual extends NullFalseLeafBinaryFunction {
+
+    public static final LessOrEqual INSTANCE = new LessOrEqual();
+
+    private LessOrEqual() {}
+
+    @Override
+    public boolean test(DataType type, Object field, Object literal) {
+        return compareLiteral(type, literal, field) >= 0;
+    }
+
+    @Override
+    public boolean test(
+            DataType type, long rowCount, Object min, Object max, Long 
nullCount, Object literal) {
+        return compareLiteral(type, literal, min) >= 0;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(GreaterThan.INSTANCE);
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        return visitor.visitLessOrEqual(fieldRef, literals.get(0));
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/LessThan.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LessThan.java
new file mode 100644
index 000000000..ed1ee70c3
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/LessThan.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link NullFalseLeafBinaryFunction} to eval less or equal. */
+public class LessThan extends NullFalseLeafBinaryFunction {
+
+    public static final LessThan INSTANCE = new LessThan();
+
+    private LessThan() {}
+
+    @Override
+    public boolean test(DataType type, Object field, Object literal) {
+        return compareLiteral(type, literal, field) > 0;
+    }
+
+    @Override
+    public boolean test(
+            DataType type, long rowCount, Object min, Object max, Long 
nullCount, Object literal) {
+        return compareLiteral(type, literal, min) > 0;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(GreaterOrEqual.INSTANCE);
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        return visitor.visitLessThan(fieldRef, literals.get(0));
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/NotEqual.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/NotEqual.java
new file mode 100644
index 000000000..bca203a22
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/NotEqual.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link NullFalseLeafBinaryFunction} to eval not equal. */
+public class NotEqual extends NullFalseLeafBinaryFunction {
+
+    public static final NotEqual INSTANCE = new NotEqual();
+
+    private NotEqual() {}
+
+    @Override
+    public boolean test(DataType type, Object field, Object literal) {
+        return compareLiteral(type, literal, field) != 0;
+    }
+
+    @Override
+    public boolean test(
+            DataType type, long rowCount, Object min, Object max, Long 
nullCount, Object literal) {
+        return compareLiteral(type, literal, min) != 0 || compareLiteral(type, 
literal, max) != 0;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(Equal.INSTANCE);
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        return visitor.visitNotEqual(fieldRef, literals.get(0));
+    }
+}
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/NotIn.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/NotIn.java
new file mode 100644
index 000000000..3bad7e56e
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/NotIn.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static com.alibaba.fluss.predicate.CompareUtils.compareLiteral;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link LeafFunction} to eval not in. */
+public class NotIn extends LeafFunction {
+
+    private static final long serialVersionUID = 8953845894700582887L;
+    public static final NotIn INSTANCE = new NotIn();
+
+    private NotIn() {}
+
+    @Override
+    public boolean test(DataType type, Object field, List<Object> literals) {
+        if (field == null) {
+            return false;
+        }
+        for (Object literal : literals) {
+            if (literal == null || compareLiteral(type, literal, field) == 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public boolean test(
+            DataType type,
+            long rowCount,
+            Object min,
+            Object max,
+            Long nullCount,
+            List<Object> literals) {
+        if (nullCount != null && rowCount == nullCount) {
+            return false;
+        }
+        for (Object literal : literals) {
+            if (literal == null
+                    || (compareLiteral(type, literal, min) == 0
+                            && compareLiteral(type, literal, max) == 0)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.of(In.INSTANCE);
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        return visitor.visitNotIn(fieldRef, literals);
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/NullFalseLeafBinaryFunction.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/NullFalseLeafBinaryFunction.java
new file mode 100644
index 000000000..2dfdf22d4
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/NullFalseLeafBinaryFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Function to test a field with a literal. */
+public abstract class NullFalseLeafBinaryFunction extends LeafFunction {
+
+    private static final long serialVersionUID = 5617091663961558170L;
+
+    public abstract boolean test(DataType type, Object field, Object literal);
+
+    public abstract boolean test(
+            DataType type, long rowCount, Object min, Object max, Long 
nullCount, Object literal);
+
+    @Override
+    public boolean test(DataType type, Object field, List<Object> literals) {
+        if (field == null || literals.get(0) == null) {
+            return false;
+        }
+        return test(type, field, literals.get(0));
+    }
+
+    @Override
+    public boolean test(
+            DataType type,
+            long rowCount,
+            Object min,
+            Object max,
+            Long nullCount,
+            List<Object> literals) {
+        if (nullCount != null) {
+            if (rowCount == nullCount || literals.get(0) == null) {
+                return false;
+            }
+        }
+        return test(type, rowCount, min, max, nullCount, literals.get(0));
+    }
+}
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/predicate/Or.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Or.java
new file mode 100644
index 000000000..744717435
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Or.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A {@link CompoundPredicate.Function} to eval or. */
+public class Or extends CompoundPredicate.Function {
+
+    private static final long serialVersionUID = -2110346319473699418L;
+
+    public static final Or INSTANCE = new Or();
+
+    private Or() {}
+
+    @Override
+    public boolean test(InternalRow row, List<Predicate> children) {
+        for (Predicate child : children) {
+            if (child.test(row)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public Optional<Predicate> negate(List<Predicate> children) {
+        List<Predicate> negatedChildren = new ArrayList<>();
+        for (Predicate child : children) {
+            Optional<Predicate> negatedChild = child.negate();
+            if (negatedChild.isPresent()) {
+                negatedChildren.add(negatedChild.get());
+            } else {
+                return Optional.empty();
+            }
+        }
+        return Optional.of(new CompoundPredicate(And.INSTANCE, 
negatedChildren));
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, List<T> children) {
+        return visitor.visitOr(children);
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/PartitionPredicateVisitor.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/PartitionPredicateVisitor.java
new file mode 100644
index 000000000..6318aba9c
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/PartitionPredicateVisitor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import java.util.List;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Visit the predicate and check if it only contains partition key's 
predicate. */
+public class PartitionPredicateVisitor implements PredicateVisitor<Boolean> {
+
+    private final List<String> partitionKeys;
+
+    public PartitionPredicateVisitor(List<String> partitionKeys) {
+        this.partitionKeys = partitionKeys;
+    }
+
+    @Override
+    public Boolean visit(LeafPredicate predicate) {
+        return partitionKeys.contains(predicate.fieldName());
+    }
+
+    @Override
+    public Boolean visit(CompoundPredicate predicate) {
+        for (Predicate child : predicate.children()) {
+            Boolean matched = child.visit(this);
+
+            if (!matched) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/Predicate.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Predicate.java
new file mode 100644
index 000000000..7083a33d5
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/Predicate.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * Predicate which returns Boolean and provides testing by stats.
+ *
+ * @see PredicateBuilder
+ * @since 0.4.0
+ */
+public interface Predicate extends Serializable {
+
+    /**
+     * Now only support test based on the specific input row. Todo: boolean 
test(long rowCount,
+     * InternalRow minValues, InternalRow maxValues, InternalArray 
nullCounts); Test based on the
+     * specific input row.
+     *
+     * @return return true when hit, false when not hit.
+     */
+    boolean test(InternalRow row);
+
+    /** @return the negation predicate of this predicate if possible. */
+    Optional<Predicate> negate();
+
+    <T> T visit(PredicateVisitor<T> visitor);
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateBuilder.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateBuilder.java
new file mode 100644
index 000000000..975954c2a
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateBuilder.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.types.DataField;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.utils.TypeUtils;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static com.alibaba.fluss.utils.Preconditions.checkArgument;
+import static java.util.Collections.singletonList;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * A utility class to create {@link Predicate} object for common filter 
conditions.
+ *
+ * @since 0.4.0
+ */
+public class PredicateBuilder {
+
+    private final RowType rowType;
+    private final List<String> fieldNames;
+
+    public PredicateBuilder(RowType rowType) {
+        this.rowType = rowType;
+        this.fieldNames = rowType.getFieldNames();
+    }
+
+    public int indexOf(String field) {
+        return fieldNames.indexOf(field);
+    }
+
+    public Predicate equal(int idx, Object literal) {
+        return leaf(Equal.INSTANCE, idx, literal);
+    }
+
+    public Predicate notEqual(int idx, Object literal) {
+        return leaf(NotEqual.INSTANCE, idx, literal);
+    }
+
+    public Predicate lessThan(int idx, Object literal) {
+        return leaf(LessThan.INSTANCE, idx, literal);
+    }
+
+    public Predicate lessOrEqual(int idx, Object literal) {
+        return leaf(LessOrEqual.INSTANCE, idx, literal);
+    }
+
+    public Predicate greaterThan(int idx, Object literal) {
+        return leaf(GreaterThan.INSTANCE, idx, literal);
+    }
+
+    public Predicate greaterOrEqual(int idx, Object literal) {
+        return leaf(GreaterOrEqual.INSTANCE, idx, literal);
+    }
+
+    public Predicate isNull(int idx) {
+        return leaf(IsNull.INSTANCE, idx);
+    }
+
+    public Predicate isNotNull(int idx) {
+        return leaf(IsNotNull.INSTANCE, idx);
+    }
+
+    public Predicate startsWith(int idx, Object patternLiteral) {
+        return leaf(StartsWith.INSTANCE, idx, patternLiteral);
+    }
+
+    public Predicate endsWith(int idx, Object patternLiteral) {
+        return leaf(EndsWith.INSTANCE, idx, patternLiteral);
+    }
+
+    public Predicate contains(int idx, Object patternLiteral) {
+        return leaf(Contains.INSTANCE, idx, patternLiteral);
+    }
+
+    public Predicate leaf(NullFalseLeafBinaryFunction function, int idx, 
Object literal) {
+        validateIndex(idx);
+        DataField field = rowType.getFields().get(idx);
+        return new LeafPredicate(
+                function, field.getType(), idx, field.getName(), 
singletonList(literal));
+    }
+
+    public Predicate leaf(LeafUnaryFunction function, int idx) {
+        validateIndex(idx);
+        DataField field = rowType.getFields().get(idx);
+        return new LeafPredicate(
+                function, field.getType(), idx, field.getName(), 
Collections.emptyList());
+    }
+
+    public Predicate in(int idx, List<Object> literals) {
+        validateIndex(idx);
+        // In the IN predicate, 20 literals are critical for performance.
+        // If there are more than 20 literals, the performance will decrease.
+        if (literals.size() > 20) {
+            DataField field = rowType.getFields().get(idx);
+            return new LeafPredicate(In.INSTANCE, field.getType(), idx, 
field.getName(), literals);
+        }
+
+        List<Predicate> equals = new ArrayList<>(literals.size());
+        for (Object literal : literals) {
+            equals.add(equal(idx, literal));
+        }
+        return or(equals);
+    }
+
+    public Predicate notIn(int idx, List<Object> literals) {
+        return in(idx, literals).negate().get();
+    }
+
+    public Predicate between(int idx, Object includedLowerBound, Object 
includedUpperBound) {
+        return new CompoundPredicate(
+                And.INSTANCE,
+                Arrays.asList(
+                        greaterOrEqual(idx, includedLowerBound),
+                        lessOrEqual(idx, includedUpperBound)));
+    }
+
+    public static Predicate and(Predicate... predicates) {
+        return and(Arrays.asList(predicates));
+    }
+
+    public static Predicate and(List<Predicate> predicates) {
+        checkArgument(
+                predicates.size() > 0,
+                "There must be at least 1 inner predicate to construct an AND 
predicate");
+        if (predicates.size() == 1) {
+            return predicates.get(0);
+        }
+        return predicates.stream()
+                .reduce((a, b) -> new CompoundPredicate(And.INSTANCE, 
Arrays.asList(a, b)))
+                .get();
+    }
+
+    @Nullable
+    public static Predicate andNullable(Predicate... predicates) {
+        return andNullable(Arrays.asList(predicates));
+    }
+
+    @Nullable
+    public static Predicate andNullable(List<Predicate> predicates) {
+        predicates = 
predicates.stream().filter(Objects::nonNull).collect(Collectors.toList());
+        if (predicates.isEmpty()) {
+            return null;
+        }
+
+        return and(predicates);
+    }
+
+    public static Predicate or(Predicate... predicates) {
+        return or(Arrays.asList(predicates));
+    }
+
+    public static Predicate or(List<Predicate> predicates) {
+        checkArgument(
+                predicates.size() > 0,
+                "There must be at least 1 inner predicate to construct an OR 
predicate");
+        return predicates.stream()
+                .reduce((a, b) -> new CompoundPredicate(Or.INSTANCE, 
Arrays.asList(a, b)))
+                .get();
+    }
+
+    public static List<Predicate> splitAnd(@Nullable Predicate predicate) {
+        if (predicate == null) {
+            return Collections.emptyList();
+        }
+        List<Predicate> result = new ArrayList<>();
+        splitCompound(And.INSTANCE, predicate, result);
+        return result;
+    }
+
+    public static List<Predicate> splitOr(@Nullable Predicate predicate) {
+        if (predicate == null) {
+            return Collections.emptyList();
+        }
+        List<Predicate> result = new ArrayList<>();
+        splitCompound(Or.INSTANCE, predicate, result);
+        return result;
+    }
+
+    private static void splitCompound(
+            CompoundPredicate.Function function, Predicate predicate, 
List<Predicate> result) {
+        if (predicate instanceof CompoundPredicate
+                && ((CompoundPredicate) 
predicate).function().equals(function)) {
+            for (Predicate child : ((CompoundPredicate) predicate).children()) 
{
+                splitCompound(function, child, result);
+            }
+        } else {
+            result.add(predicate);
+        }
+    }
+
+    private void validateIndex(int idx) {
+        if (idx < 0 || idx >= rowType.getFieldCount()) {
+            throw new UnsupportedExpression("idx is not valid");
+        }
+    }
+
+    public static Object convertJavaObject(DataType literalType, Object o) {
+        if (o == null) {
+            return null;
+        }
+        switch (literalType.getTypeRoot()) {
+            case BOOLEAN:
+                return o;
+            case BIGINT:
+                return ((Number) o).longValue();
+            case DOUBLE:
+                return ((Number) o).doubleValue();
+            case TINYINT:
+                return ((Number) o).byteValue();
+            case SMALLINT:
+                return ((Number) o).shortValue();
+            case INTEGER:
+                return ((Number) o).intValue();
+            case FLOAT:
+                return ((Number) o).floatValue();
+            case STRING:
+                return BinaryString.fromString(o.toString());
+            case DATE:
+                // Hive uses `java.sql.Date.valueOf(lit.toString());` to 
convert a literal to Date
+                // Which uses `java.util.Date()` internally to create the 
object and that uses the
+                // TimeZone.getDefaultRef()
+                // To get back the expected date we have to use the LocalDate 
which gets rid of the
+                // TimeZone misery as it uses the year/month/day to generate 
the object
+                LocalDate localDate;
+                if (o instanceof java.sql.Timestamp) {
+                    localDate = ((java.sql.Timestamp) 
o).toLocalDateTime().toLocalDate();
+                } else if (o instanceof Date) {
+                    localDate = ((Date) o).toLocalDate();
+                } else if (o instanceof LocalDate) {
+                    localDate = (LocalDate) o;
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Unexpected date literal of class " + 
o.getClass().getName());
+                }
+                LocalDate epochDay =
+                        
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC).toLocalDate();
+                return (int) ChronoUnit.DAYS.between(epochDay, localDate);
+            case TIME_WITHOUT_TIME_ZONE:
+                LocalTime localTime;
+                if (o instanceof java.sql.Time) {
+                    localTime = ((java.sql.Time) o).toLocalTime();
+                } else if (o instanceof LocalTime) {
+                    localTime = (LocalTime) o;
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Unexpected time literal of class " + 
o.getClass().getName());
+                }
+                // return millis of a day
+                return (int) (localTime.toNanoOfDay() / 1_000_000);
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) literalType;
+                int precision = decimalType.getPrecision();
+                int scale = decimalType.getScale();
+                return Decimal.fromBigDecimal((BigDecimal) o, precision, 
scale);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                if (o instanceof java.sql.Timestamp) {
+                    LocalDateTime localDateTime =
+                            ((Timestamp) o)
+                                    .toInstant()
+                                    .atZone(ZoneId.systemDefault())
+                                    .toLocalDateTime();
+                    return TimestampNtz.fromLocalDateTime(localDateTime);
+                } else if (o instanceof Instant) {
+                    Instant o1 = (Instant) o;
+                    LocalDateTime dateTime = 
o1.atZone(ZoneId.systemDefault()).toLocalDateTime();
+                    return TimestampNtz.fromLocalDateTime(dateTime);
+                } else if (o instanceof LocalDateTime) {
+                    return TimestampNtz.fromLocalDateTime((LocalDateTime) o);
+                } else {
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Unsupported class %s for timestamp 
without timezone ",
+                                    o.getClass()));
+                }
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                if (o instanceof java.sql.Timestamp) {
+                    java.sql.Timestamp timestamp = (java.sql.Timestamp) o;
+                    return TimestampLtz.fromInstant(timestamp.toInstant());
+                } else if (o instanceof Instant) {
+                    return TimestampLtz.fromInstant((Instant) o);
+                } else {
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Unsupported class %s for timestamp with 
local time zone ",
+                                    o.getClass()));
+                }
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported predicate leaf type " + 
literalType.getTypeRoot().name());
+        }
+    }
+
+    public static List<Predicate> pickTransformFieldMapping(
+            List<Predicate> predicates, List<String> inputFields, List<String> 
pickedFields) {
+        return pickTransformFieldMapping(
+                predicates, 
inputFields.stream().mapToInt(pickedFields::indexOf).toArray());
+    }
+
+    public static List<Predicate> pickTransformFieldMapping(
+            List<Predicate> predicates, int[] fieldIdxMapping) {
+        List<Predicate> pick = new ArrayList<>();
+        for (Predicate p : predicates) {
+            Optional<Predicate> mapped = transformFieldMapping(p, 
fieldIdxMapping);
+            mapped.ifPresent(pick::add);
+        }
+        return pick;
+    }
+
+    public static Optional<Predicate> transformFieldMapping(
+            Predicate predicate, int[] fieldIdxMapping) {
+        if (predicate instanceof CompoundPredicate) {
+            CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
+            List<Predicate> children = new ArrayList<>();
+            for (Predicate child : compoundPredicate.children()) {
+                Optional<Predicate> mapped = transformFieldMapping(child, 
fieldIdxMapping);
+                if (mapped.isPresent()) {
+                    children.add(mapped.get());
+                } else {
+                    return Optional.empty();
+                }
+            }
+            return Optional.of(new 
CompoundPredicate(compoundPredicate.function(), children));
+        } else {
+            LeafPredicate leafPredicate = (LeafPredicate) predicate;
+            int mapped = fieldIdxMapping[leafPredicate.index()];
+            if (mapped >= 0) {
+                return Optional.of(
+                        new LeafPredicate(
+                                leafPredicate.function(),
+                                leafPredicate.type(),
+                                mapped,
+                                leafPredicate.fieldName(),
+                                leafPredicate.literals()));
+            } else {
+                return Optional.empty();
+            }
+        }
+    }
+
+    public static boolean containsFields(Predicate predicate, Set<String> 
fields) {
+        if (predicate instanceof CompoundPredicate) {
+            for (Predicate child : ((CompoundPredicate) predicate).children()) 
{
+                if (containsFields(child, fields)) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            LeafPredicate leafPredicate = (LeafPredicate) predicate;
+            return fields.contains(leafPredicate.fieldName());
+        }
+    }
+
+    public static List<Predicate> excludePredicateWithFields(
+            @Nullable List<Predicate> predicates, Set<String> fields) {
+        if (predicates == null || predicates.isEmpty() || fields.isEmpty()) {
+            return predicates;
+        }
+        return predicates.stream()
+                .filter(f -> !containsFields(f, fields))
+                .collect(Collectors.toList());
+    }
+
+    @Nullable
+    public static Predicate partition(
+            Map<String, String> map, RowType rowType, String defaultPartValue) 
{
+        Map<String, Object> internalValues = convertSpecToInternal(map, 
rowType, defaultPartValue);
+        List<String> fieldNames = rowType.getFieldNames();
+        Predicate predicate = null;
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        for (Map.Entry<String, Object> entry : internalValues.entrySet()) {
+            int idx = fieldNames.indexOf(entry.getKey());
+            Object literal = internalValues.get(entry.getKey());
+            Predicate predicateTemp =
+                    literal == null ? builder.isNull(idx) : builder.equal(idx, 
literal);
+            if (predicate == null) {
+                predicate = predicateTemp;
+            } else {
+                predicate = PredicateBuilder.and(predicate, predicateTemp);
+            }
+        }
+        return predicate;
+    }
+
+    public static Predicate partitions(
+            List<Map<String, String>> partitions, RowType rowType, String 
defaultPartValue) {
+        return PredicateBuilder.or(
+                partitions.stream()
+                        .map(p -> PredicateBuilder.partition(p, rowType, 
defaultPartValue))
+                        .toArray(Predicate[]::new));
+    }
+
+    public static Map<String, Object> convertSpecToInternal(
+            Map<String, String> spec, RowType partType, String 
defaultPartValue) {
+        Map<String, Object> partValues = new LinkedHashMap<>();
+        for (Map.Entry<String, String> entry : spec.entrySet()) {
+            partValues.put(
+                    entry.getKey(),
+                    defaultPartValue.equals(entry.getValue())
+                            ? null
+                            : TypeUtils.castFromString(
+                                    entry.getValue(), 
partType.getField(entry.getKey()).getType()));
+        }
+        return partValues;
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateVisitor.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateVisitor.java
new file mode 100644
index 000000000..3e20e9c92
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateVisitor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A visitor to visit {@link Predicate}. */
+public interface PredicateVisitor<T> {
+
+    T visit(LeafPredicate predicate);
+
+    T visit(CompoundPredicate predicate);
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/StartsWith.java 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/StartsWith.java
new file mode 100644
index 000000000..e3baa41fa
--- /dev/null
+++ b/fluss-common/src/main/java/com/alibaba/fluss/predicate/StartsWith.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.types.DataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * A {@link NullFalseLeafBinaryFunction} to evaluate {@code filter like 'abc%' 
or filter like
+ * 'abc_'}.
+ */
+public class StartsWith extends NullFalseLeafBinaryFunction {
+
+    public static final StartsWith INSTANCE = new StartsWith();
+
+    private StartsWith() {}
+
+    @Override
+    public boolean test(DataType type, Object field, Object patternLiteral) {
+        String fieldString = field.toString();
+        return fieldString.startsWith((String) patternLiteral);
+    }
+
+    @Override
+    public boolean test(
+            DataType type,
+            long rowCount,
+            Object min,
+            Object max,
+            Long nullCount,
+            Object patternLiteral) {
+        String minStr = min.toString();
+        String maxStr = max.toString();
+        String pattern = patternLiteral.toString();
+        return (minStr.startsWith(pattern) || minStr.compareTo(pattern) <= 0)
+                && (maxStr.startsWith(pattern) || maxStr.compareTo(pattern) >= 
0);
+    }
+
+    @Override
+    public Optional<LeafFunction> negate() {
+        return Optional.empty();
+    }
+
+    @Override
+    public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
+        return visitor.visitStartsWith(fieldRef, literals.get(0));
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/predicate/UnsupportedExpression.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/UnsupportedExpression.java
new file mode 100644
index 000000000..74b50f2f4
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/predicate/UnsupportedExpression.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+/** Encounter an unsupported expression, the caller can choose to ignore this 
filter branch. */
+public class UnsupportedExpression extends RuntimeException {
+    public UnsupportedExpression(String message) {
+        super(message);
+    }
+
+    public UnsupportedExpression() {
+        super();
+    }
+}
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/types/RowType.java 
b/fluss-common/src/main/java/com/alibaba/fluss/types/RowType.java
index 425074482..840d0a86b 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/types/RowType.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/types/RowType.java
@@ -255,4 +255,14 @@ public final class RowType extends DataType {
             return new RowType(isNullable, fields);
         }
     }
+
+    public DataField getField(String fieldName) {
+        for (DataField field : fields) {
+            if (field.getName().equals(fieldName)) {
+                return field;
+            }
+        }
+
+        throw new RuntimeException("Cannot find field: " + fieldName);
+    }
 }
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/utils/BinaryStringUtils.java 
b/fluss-common/src/main/java/com/alibaba/fluss/utils/BinaryStringUtils.java
index ea4ad277b..78c792920 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/utils/BinaryStringUtils.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/BinaryStringUtils.java
@@ -18,10 +18,12 @@
 package com.alibaba.fluss.utils;
 
 import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.TimestampLtz;
 import com.alibaba.fluss.row.TimestampNtz;
 
 import java.time.DateTimeException;
 import java.util.List;
+import java.util.TimeZone;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -73,4 +75,10 @@ public class BinaryStringUtils {
             throws DateTimeException {
         return DateTimeUtils.parseTimestampData(input.toString(), precision);
     }
+
+    /** Used by {@code CAST(x as TIMESTAMPLTZ)}. */
+    public static TimestampLtz toTimestampltz(BinaryString input, int 
precision, TimeZone timeZone)
+            throws DateTimeException {
+        return DateTimeUtils.parseTimestampData(input.toString(), precision, 
timeZone);
+    }
 }
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/utils/DateTimeUtils.java 
b/fluss-common/src/main/java/com/alibaba/fluss/utils/DateTimeUtils.java
index 6e37bdac6..bf27a06d3 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/utils/DateTimeUtils.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/DateTimeUtils.java
@@ -17,6 +17,7 @@
 
 package com.alibaba.fluss.utils;
 
+import com.alibaba.fluss.row.TimestampLtz;
 import com.alibaba.fluss.row.TimestampNtz;
 
 import java.time.DateTimeException;
@@ -26,6 +27,7 @@ import java.time.LocalTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
 import java.time.temporal.TemporalAccessor;
+import java.util.TimeZone;
 
 import static java.time.temporal.ChronoField.DAY_OF_MONTH;
 import static java.time.temporal.ChronoField.HOUR_OF_DAY;
@@ -109,6 +111,14 @@ public class DateTimeUtils {
                 
fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), precision));
     }
 
+    public static TimestampLtz parseTimestampData(String dateStr, int 
precision, TimeZone timeZone)
+            throws DateTimeException {
+        return TimestampLtz.fromInstant(
+                
fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), precision)
+                        .atZone(timeZone.toZoneId())
+                        .toInstant());
+    }
+
     public static Integer parseDate(String s) {
         // allow timestamp str to date, e.g. 2017-12-12 09:30:00.0
         int ws1 = s.indexOf(" ");
diff --git 
a/fluss-common/src/test/java/com/alibaba/fluss/predicate/PredicateBuilderTest.java
 
b/fluss-common/src/test/java/com/alibaba/fluss/predicate/PredicateBuilderTest.java
new file mode 100644
index 000000000..acb193fe3
--- /dev/null
+++ 
b/fluss-common/src/test/java/com/alibaba/fluss/predicate/PredicateBuilderTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PredicateBuilder}. */
+public class PredicateBuilderTest {
+
+    @Test
+    public void testBetween() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.between(0, 1, 3);
+
+        assertThat(predicate.test(GenericRow.of(1))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(2))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(3))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testBetweenNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.between(0, 1, null);
+
+        assertThat(predicate.test(GenericRow.of(1))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(3))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testSplitAnd() {
+        PredicateBuilder builder =
+                new PredicateBuilder(
+                        RowType.of(
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType(),
+                                new IntType()));
+
+        Predicate child1 =
+                PredicateBuilder.or(builder.isNull(0), builder.isNull(1), 
builder.isNull(2));
+        Predicate child2 =
+                PredicateBuilder.and(builder.isNull(3), builder.isNull(4), 
builder.isNull(5));
+        Predicate child3 = builder.isNull(6);
+
+        assertThat(PredicateBuilder.splitAnd(PredicateBuilder.and(child1, 
child2, child3)))
+                .isEqualTo(
+                        Arrays.asList(
+                                child1,
+                                builder.isNull(3),
+                                builder.isNull(4),
+                                builder.isNull(5),
+                                child3));
+    }
+}
diff --git 
a/fluss-common/src/test/java/com/alibaba/fluss/predicate/PredicateTest.java 
b/fluss-common/src/test/java/com/alibaba/fluss/predicate/PredicateTest.java
new file mode 100644
index 000000000..427fec8a8
--- /dev/null
+++ b/fluss-common/src/test/java/com/alibaba/fluss/predicate/PredicateTest.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.predicate;
+
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.StringType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static com.alibaba.fluss.row.BinaryString.fromString;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link Predicate}s. */
+public class PredicateTest {
+
+    @Test
+    public void testEqual() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.equal(0, 5);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(5))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+
+        
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.notEqual(0, 5));
+    }
+
+    @Test
+    public void testEqualNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.equal(0, null);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testNotEqual() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.notEqual(0, 5);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(5))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+
+        assertThat(predicate.negate().orElse(null)).isEqualTo(builder.equal(0, 
5));
+    }
+
+    @Test
+    public void testNotEqualNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.notEqual(0, null);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testGreater() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.greaterThan(0, 5);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(5))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(6))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+
+        
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.lessOrEqual(0, 
5));
+    }
+
+    @Test
+    public void testGreaterNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.greaterThan(0, null);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testGreaterOrEqual() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.greaterOrEqual(0, 5);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(5))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(6))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+
+        
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.lessThan(0, 5));
+    }
+
+    @Test
+    public void testGreaterOrEqualNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.greaterOrEqual(0, null);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testLess() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.lessThan(0, 5);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(5))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(6))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+
+        
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.greaterOrEqual(0, 
5));
+    }
+
+    @Test
+    public void testLessNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.lessThan(0, null);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testLessOrEqual() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.lessOrEqual(0, 5);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(5))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(6))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+
+        
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.greaterThan(0, 
5));
+    }
+
+    @Test
+    public void testLessOrEqualNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.lessOrEqual(0, null);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testIsNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.isNull(0);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(true);
+
+        
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNotNull(0));
+    }
+
+    @Test
+    public void testIsNotNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.isNotNull(0);
+
+        assertThat(predicate.test(GenericRow.of(4))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+
+        
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNull(0));
+    }
+
+    @Test
+    public void testIn() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.in(0, Arrays.asList(1, 3));
+        assertThat(predicate).isInstanceOf(CompoundPredicate.class);
+
+        assertThat(predicate.test(GenericRow.of(1))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(3))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testInNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.in(0, Arrays.asList(1, null, 3));
+        assertThat(predicate).isInstanceOf(CompoundPredicate.class);
+
+        assertThat(predicate.test(GenericRow.of(1))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(3))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testNotIn() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.notIn(0, Arrays.asList(1, 3));
+        assertThat(predicate).isInstanceOf(CompoundPredicate.class);
+
+        assertThat(predicate.test(GenericRow.of(1))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(2))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(3))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testNotInNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.notIn(0, Arrays.asList(1, null, 3));
+        assertThat(predicate).isInstanceOf(CompoundPredicate.class);
+
+        assertThat(predicate.test(GenericRow.of(1))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(3))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testEndsWith() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
StringType()));
+        Predicate predicate = builder.endsWith(0, ("bcc"));
+        GenericRow row = GenericRow.of(fromString("aabbcc"));
+
+        assertThat(predicate.test(row)).isEqualTo(true);
+    }
+
+    @Test
+    public void testStartWith() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
StringType()));
+        Predicate predicate = builder.startsWith(0, ("aab"));
+        GenericRow row = GenericRow.of(fromString("aabbcc"));
+
+        assertThat(predicate.test(row)).isEqualTo(true);
+    }
+
+    @Test
+    public void testContainsWith() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
StringType()));
+        Predicate predicate = builder.contains(0, ("def"));
+        GenericRow row1 = GenericRow.of(fromString("aabbdefcc"));
+        GenericRow row2 = GenericRow.of(fromString("aabbdcefcc"));
+
+        assertThat(predicate.test(row1)).isEqualTo(true);
+        assertThat(predicate.test(row2)).isEqualTo(false);
+    }
+
+    @Test
+    public void testLargeIn() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        List<Object> literals = new ArrayList<>();
+        literals.add(1);
+        literals.add(3);
+        for (int i = 10; i < 30; i++) {
+            literals.add(i);
+        }
+        Predicate predicate = builder.in(0, literals);
+        assertThat(predicate).isInstanceOf(LeafPredicate.class);
+
+        assertThat(predicate.test(GenericRow.of(1))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(3))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testLargeInNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        List<Object> literals = new ArrayList<>();
+        literals.add(1);
+        literals.add(null);
+        literals.add(3);
+        for (int i = 10; i < 30; i++) {
+            literals.add(i);
+        }
+        Predicate predicate = builder.in(0, literals);
+        assertThat(predicate).isInstanceOf(LeafPredicate.class);
+
+        assertThat(predicate.test(GenericRow.of(1))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(3))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testLargeNotIn() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        List<Object> literals = new ArrayList<>();
+        literals.add(1);
+        literals.add(3);
+        for (int i = 10; i < 30; i++) {
+            literals.add(i);
+        }
+        Predicate predicate = builder.notIn(0, literals);
+        assertThat(predicate).isInstanceOf(LeafPredicate.class);
+
+        assertThat(predicate.test(GenericRow.of(1))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(2))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(3))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testLargeNotInNull() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        List<Object> literals = new ArrayList<>();
+        literals.add(1);
+        literals.add(null);
+        literals.add(3);
+        for (int i = 10; i < 30; i++) {
+            literals.add(i);
+        }
+        Predicate predicate = builder.notIn(0, literals);
+        assertThat(predicate).isInstanceOf(LeafPredicate.class);
+
+        assertThat(predicate.test(GenericRow.of(1))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(2))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(3))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of((Object) 
null))).isEqualTo(false);
+    }
+
+    @Test
+    public void testAnd() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType(), new IntType()));
+        Predicate predicate = PredicateBuilder.and(builder.equal(0, 3), 
builder.equal(1, 5));
+
+        assertThat(predicate.test(GenericRow.of(4, 5))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(3, 6))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(3, 5))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(null, 5))).isEqualTo(false);
+
+        assertThat(predicate.negate().orElse(null))
+                .isEqualTo(PredicateBuilder.or(builder.notEqual(0, 3), 
builder.notEqual(1, 5)));
+    }
+
+    @Test
+    public void testOr() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType(), new IntType()));
+        Predicate predicate = PredicateBuilder.or(builder.equal(0, 3), 
builder.equal(1, 5));
+
+        assertThat(predicate.test(GenericRow.of(4, 6))).isEqualTo(false);
+        assertThat(predicate.test(GenericRow.of(3, 6))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(3, 5))).isEqualTo(true);
+        assertThat(predicate.test(GenericRow.of(null, 5))).isEqualTo(true);
+
+        assertThat(predicate.negate().orElse(null))
+                .isEqualTo(PredicateBuilder.and(builder.notEqual(0, 3), 
builder.notEqual(1, 5)));
+    }
+
+    @Test
+    public void testUnknownStats() {
+        PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate predicate = builder.equal(0, 5);
+    }
+
+    @Test
+    public void testPredicateToString() {
+        PredicateBuilder builder1 = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate p1 = builder1.equal(0, 5);
+        assertThat(p1.toString()).isEqualTo("Equal(f0, 5)");
+
+        PredicateBuilder builder2 = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate p2 = builder2.greaterThan(0, 5);
+        assertThat(p2.toString()).isEqualTo("GreaterThan(f0, 5)");
+
+        PredicateBuilder builder3 = new PredicateBuilder(RowType.of(new 
IntType(), new IntType()));
+        Predicate p3 = PredicateBuilder.and(builder3.equal(0, 3), 
builder3.equal(1, 5));
+        assertThat(p3.toString()).isEqualTo("And([Equal(f0, 3), Equal(f1, 
5)])");
+
+        PredicateBuilder builder4 = new PredicateBuilder(RowType.of(new 
IntType(), new IntType()));
+        Predicate p4 = PredicateBuilder.or(builder4.equal(0, 3), 
builder4.equal(1, 5));
+        assertThat(p4.toString()).isEqualTo("Or([Equal(f0, 3), Equal(f1, 
5)])");
+
+        PredicateBuilder builder5 = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate p5 = builder5.isNotNull(0);
+        assertThat(p5.toString()).isEqualTo("IsNotNull(f0)");
+
+        PredicateBuilder builder6 = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate p6 = builder6.in(0, Arrays.asList(1, null, 3, 4));
+        assertThat(p6.toString())
+                .isEqualTo(
+                        "Or([Or([Or([Equal(f0, 1), Equal(f0, null)]), 
Equal(f0, 3)]), Equal(f0, 4)])");
+
+        PredicateBuilder builder7 = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate p7 = builder7.notIn(0, Arrays.asList(1, null, 3, 4));
+        assertThat(p7.toString())
+                .isEqualTo(
+                        "And([And([And([NotEqual(f0, 1), NotEqual(f0, null)]), 
NotEqual(f0, 3)]), NotEqual(f0, 4)])");
+
+        PredicateBuilder builder8 = new PredicateBuilder(RowType.of(new 
IntType()));
+        List<Object> literals = new ArrayList<>();
+        for (int i = 1; i <= 21; i++) {
+            literals.add(i);
+        }
+        Predicate p8 = builder8.in(0, literals);
+        assertThat(p8.toString())
+                .isEqualTo(
+                        "In(f0, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 
14, 15, 16, 17, 18, 19, 20, 21])");
+
+        PredicateBuilder builder9 = new PredicateBuilder(RowType.of(new 
IntType()));
+        Predicate p9 = builder9.notIn(0, literals);
+        assertThat(p9.toString())
+                .isEqualTo(
+                        "NotIn(f0, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 
14, 15, 16, 17, 18, 19, 20, 21])");
+    }
+}
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index 48288186b..5feb9c8ae 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -322,6 +322,8 @@
                                         
<exclude>com.alibaba.fluss.kafka.*</exclude>
                                         <!-- exclude for fluss-ci-tools -->
                                         
<exclude>com.alibaba.fluss.tools.ci.*</exclude>
+                                        <!-- end exclude for predicate -->
+                                        
<exclude>com.alibaba.fluss.predicate.*</exclude>
                                         <!-- exclude for dummy class -->
                                         
<exclude>com.alibaba.fluss.dist.DummyClass</exclude>
                                         
<exclude>com.alibaba.fluss.flink.DummyClass120</exclude>

Reply via email to