This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/paimon-python.git

commit cbb81bbb66b86895fa18538fbeacfb0bd3dd758a
Author: yuzelin <[email protected]>
AuthorDate: Wed Nov 6 17:49:17 2024 +0800

    Read interface support predicate (#22)
    
    (cherry picked from commit 5e7d4687e1106f4fb928fb9bbab1e50d15b99ff9)
---
 .github/workflows/paimon-python-checks.yml         |   7 +
 paimon_python_api/__init__.py                      |   5 +-
 paimon_python_api/predicate.py                     |  95 +++++
 paimon_python_api/read_builder.py                  |  13 +-
 paimon_python_java/__init__.py                     |   7 +-
 paimon_python_java/gateway_server.py               |   2 +-
 paimon_python_java/java_gateway.py                 |   1 +
 .../org/apache/paimon/python/PredicationUtil.java  | 111 ++++++
 paimon_python_java/pypaimon.py                     | 103 +++++-
 paimon_python_java/tests/test_preicates.py         | 394 +++++++++++++++++++++
 paimon_python_java/tests/utils.py                  |   5 +
 11 files changed, 735 insertions(+), 8 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index e94820b..195783f 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -43,7 +43,14 @@ jobs:
         with:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
+      - name: Set up hadoop dependency
+        run: |
+          mkdir -p ${{ github.workspace }}/temp
+          curl -L -o ${{ github.workspace }}/temp/bundled-hadoop.jar \
+          
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
       - name: Run lint-python.sh
+        env:
+          _PYPAIMON_HADOOP_CLASSPATH: ${{ github.workspace 
}}/temp/bundled-hadoop.jar
         run: |
           chmod +x dev/lint-python.sh
           ./dev/lint-python.sh
diff --git a/paimon_python_api/__init__.py b/paimon_python_api/__init__.py
index 86090c9..44717bf 100644
--- a/paimon_python_api/__init__.py
+++ b/paimon_python_api/__init__.py
@@ -19,6 +19,7 @@
 from .split import Split
 from .table_read import TableRead
 from .table_scan import TableScan, Plan
+from .predicate import Predicate, PredicateBuilder
 from .read_builder import ReadBuilder
 from .commit_message import CommitMessage
 from .table_commit import BatchTableCommit
@@ -39,5 +40,7 @@ __all__ = [
     'BatchWriteBuilder',
     'Table',
     'Schema',
-    'Catalog'
+    'Catalog',
+    'Predicate',
+    'PredicateBuilder'
 ]
diff --git a/paimon_python_api/predicate.py b/paimon_python_api/predicate.py
new file mode 100644
index 0000000..46280d1
--- /dev/null
+++ b/paimon_python_api/predicate.py
@@ -0,0 +1,95 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+from abc import ABC, abstractmethod
+from typing import Any, List
+
+
+class Predicate(ABC):
+    """Predicate which evaluates to a boolean. Now it doesn't have
+    any methods because only paimon_python_java implement it and
+    the Java implementation convert it to Java object."""
+
+
+class PredicateBuilder(ABC):
+    """A utility class to create Predicate object for common filter 
conditions."""
+
+    @abstractmethod
+    def equal(self, field: str, literal: Any) -> Predicate:
+        """field = literal"""
+
+    @abstractmethod
+    def not_equal(self, field: str, literal: Any) -> Predicate:
+        """field <> literal"""
+
+    @abstractmethod
+    def less_than(self, field: str, literal: Any) -> Predicate:
+        """field < literal"""
+
+    @abstractmethod
+    def less_or_equal(self, field: str, literal: Any) -> Predicate:
+        """field <= literal"""
+
+    @abstractmethod
+    def greater_than(self, field: str, literal: Any) -> Predicate:
+        """field > literal"""
+
+    @abstractmethod
+    def greater_or_equal(self, field: str, literal: Any) -> Predicate:
+        """field >= literal"""
+
+    @abstractmethod
+    def is_null(self, field: str) -> Predicate:
+        """field IS NULL"""
+
+    @abstractmethod
+    def is_not_null(self, field: str) -> Predicate:
+        """field IS NOT NULL"""
+
+    @abstractmethod
+    def startswith(self, field: str, pattern_literal: Any) -> Predicate:
+        """field.startswith"""
+
+    @abstractmethod
+    def endswith(self, field: str, pattern_literal: Any) -> Predicate:
+        """field.endswith()"""
+
+    @abstractmethod
+    def contains(self, field: str, pattern_literal: Any) -> Predicate:
+        """literal in field"""
+
+    @abstractmethod
+    def is_in(self, field: str, literals: List[Any]) -> Predicate:
+        """field IN literals"""
+
+    @abstractmethod
+    def is_not_in(self, field: str, literals: List[Any]) -> Predicate:
+        """field NOT IN literals"""
+
+    @abstractmethod
+    def between(self, field: str, included_lower_bound: Any, 
included_upper_bound: Any) \
+            -> Predicate:
+        """field BETWEEN included_lower_bound AND included_upper_bound"""
+
+    @abstractmethod
+    def and_predicates(self, predicates: List[Predicate]) -> Predicate:
+        """predicate1 AND predicate2 AND ..."""
+
+    @abstractmethod
+    def or_predicates(self, predicates: List[Predicate]) -> Predicate:
+        """predicate1 OR predicate2 OR ..."""
diff --git a/paimon_python_api/read_builder.py 
b/paimon_python_api/read_builder.py
index 94ec073..ad5e6d6 100644
--- a/paimon_python_api/read_builder.py
+++ b/paimon_python_api/read_builder.py
@@ -17,13 +17,20 @@
 
#################################################################################
 
 from abc import ABC, abstractmethod
-from paimon_python_api import TableRead, TableScan
+from paimon_python_api import TableRead, TableScan, Predicate, PredicateBuilder
 from typing import List
 
 
 class ReadBuilder(ABC):
     """An interface for building the TableScan and TableRead."""
 
+    @abstractmethod
+    def with_filter(self, predicate: Predicate):
+        """
+        Push filters, will filter the data as much as possible,
+        but it is not guaranteed that it is a complete filter.
+        """
+
     @abstractmethod
     def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder':
         """Push nested projection."""
@@ -39,3 +46,7 @@ class ReadBuilder(ABC):
     @abstractmethod
     def new_read(self) -> TableRead:
         """Create a TableRead to read splits."""
+
+    @abstractmethod
+    def new_predicate_builder(self) -> PredicateBuilder:
+        """Create a builder for Predicate."""
diff --git a/paimon_python_java/__init__.py b/paimon_python_java/__init__.py
index 6e97d9e..9b0d002 100644
--- a/paimon_python_java/__init__.py
+++ b/paimon_python_java/__init__.py
@@ -18,7 +18,8 @@
 
 from .util import constants
 from .pypaimon import (Catalog, Table, ReadBuilder, TableScan, Plan, Split, 
TableRead,
-                       BatchWriteBuilder, BatchTableWrite, CommitMessage, 
BatchTableCommit)
+                       BatchWriteBuilder, BatchTableWrite, CommitMessage, 
BatchTableCommit,
+                       Predicate, PredicateBuilder)
 
 __all__ = [
     'constants',
@@ -32,5 +33,7 @@ __all__ = [
     'BatchWriteBuilder',
     'BatchTableWrite',
     'CommitMessage',
-    'BatchTableCommit'
+    'BatchTableCommit',
+    'Predicate',
+    'PredicateBuilder'
 ]
diff --git a/paimon_python_java/gateway_server.py 
b/paimon_python_java/gateway_server.py
index 7285e98..2061d59 100644
--- a/paimon_python_java/gateway_server.py
+++ b/paimon_python_java/gateway_server.py
@@ -103,7 +103,7 @@ def _get_hadoop_classpath(env):
         return env[constants.PYPAIMON_HADOOP_CLASSPATH]
 
     if 'HADOOP_CLASSPATH' in env:
-        return None
+        return env['HADOOP_CLASSPATH']
     else:
         raise EnvironmentError(f"You haven't set 
'{constants.PYPAIMON_HADOOP_CLASSPATH}', \
  and 'HADOOP_CLASSPATH' is also not set. Ensure one of them is set.")
diff --git a/paimon_python_java/java_gateway.py 
b/paimon_python_java/java_gateway.py
index f2b1621..3dabcfd 100644
--- a/paimon_python_java/java_gateway.py
+++ b/paimon_python_java/java_gateway.py
@@ -109,6 +109,7 @@ def import_paimon_view(gateway):
     java_import(gateway.jvm, 'org.apache.paimon.types.*')
     java_import(gateway.jvm, 'org.apache.paimon.python.*')
     java_import(gateway.jvm, "org.apache.paimon.data.*")
+    java_import(gateway.jvm, "org.apache.paimon.predicate.PredicateBuilder")
 
 
 class Watchdog(object):
diff --git 
a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java
 
b/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java
new file mode 100644
index 0000000..a863dfd
--- /dev/null
+++ 
b/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.python;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** For building Predicate. */
+public class PredicationUtil {
+
+    public static Predicate build(
+            RowType rowType,
+            PredicateBuilder builder,
+            String method,
+            int index,
+            List<Object> literals) {
+        literals =
+                literals.stream()
+                        .map(l -> convertJavaObject(rowType.getTypeAt(index), 
l))
+                        .collect(Collectors.toList());
+        switch (method) {
+            case "equal":
+                return builder.equal(index, literals.get(0));
+            case "notEqual":
+                return builder.notEqual(index, literals.get(0));
+            case "lessThan":
+                return builder.lessThan(index, literals.get(0));
+            case "lessOrEqual":
+                return builder.lessOrEqual(index, literals.get(0));
+            case "greaterThan":
+                return builder.greaterThan(index, literals.get(0));
+            case "greaterOrEqual":
+                return builder.greaterOrEqual(index, literals.get(0));
+            case "isNull":
+                return builder.isNull(index);
+            case "isNotNull":
+                return builder.isNotNull(index);
+            case "startsWith":
+                return builder.startsWith(index, literals.get(0));
+            case "endsWith":
+                return builder.endsWith(index, literals.get(0));
+            case "contains":
+                return builder.contains(index, literals.get(0));
+            case "in":
+                return builder.in(index, literals);
+            case "notIn":
+                return builder.notIn(index, literals);
+            case "between":
+                return builder.between(index, literals.get(0), 
literals.get(1));
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown PredicateBuilder method " + method);
+        }
+    }
+
+    /** Some type is not convenient to transfer from Python to Java. */
+    private static Object convertJavaObject(DataType literalType, Object 
literal) {
+        switch (literalType.getTypeRoot()) {
+            case BOOLEAN:
+            case DOUBLE:
+            case INTEGER:
+                return literal;
+            case CHAR:
+            case VARCHAR:
+                return BinaryString.fromString((String) literal);
+            case FLOAT:
+                return ((Number) literal).floatValue();
+            case TINYINT:
+                return ((Number) literal).byteValue();
+            case SMALLINT:
+                return ((Number) literal).shortValue();
+            case BIGINT:
+                return ((Number) literal).longValue();
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported predicate leaf type " + 
literalType.getTypeRoot().name());
+        }
+    }
+
+    public static Predicate buildAnd(List<Predicate> predicates) {
+        // 'and' is keyword of Python
+        return PredicateBuilder.and(predicates);
+    }
+
+    public static Predicate buildOr(List<Predicate> predicates) {
+        // 'or' is keyword of Python
+        return PredicateBuilder.or(predicates);
+    }
+}
diff --git a/paimon_python_java/pypaimon.py b/paimon_python_java/pypaimon.py
index fcf0695..0d3101b 100644
--- a/paimon_python_java/pypaimon.py
+++ b/paimon_python_java/pypaimon.py
@@ -22,8 +22,9 @@ import pyarrow as pa
 from paimon_python_java.java_gateway import get_gateway
 from paimon_python_java.util import java_utils, constants
 from paimon_python_api import (catalog, table, read_builder, table_scan, 
split, table_read,
-                               write_builder, table_write, commit_message, 
table_commit, Schema)
-from typing import List, Iterator, Optional
+                               write_builder, table_write, commit_message, 
table_commit, Schema,
+                               predicate)
+from typing import List, Iterator, Optional, Any
 
 
 class Catalog(catalog.Catalog):
@@ -85,6 +86,10 @@ class ReadBuilder(read_builder.ReadBuilder):
         self._catalog_options = catalog_options
         self._arrow_schema = arrow_schema
 
+    def with_filter(self, predicate: 'Predicate'):
+        self._j_read_builder.withFilter(predicate.to_j_predicate())
+        return self
+
     def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder':
         self._j_read_builder.withProjection(projection)
         return self
@@ -98,9 +103,12 @@ class ReadBuilder(read_builder.ReadBuilder):
         return TableScan(j_table_scan)
 
     def new_read(self) -> 'TableRead':
-        j_table_read = self._j_read_builder.newRead()
+        j_table_read = self._j_read_builder.newRead().executeFilter()
         return TableRead(j_table_read, self._j_row_type, 
self._catalog_options, self._arrow_schema)
 
+    def new_predicate_builder(self) -> 'PredicateBuilder':
+        return PredicateBuilder(self._j_row_type)
+
 
 class TableScan(table_scan.TableScan):
 
@@ -257,3 +265,92 @@ class BatchTableCommit(table_commit.BatchTableCommit):
 
     def close(self):
         self._j_batch_table_commit.close()
+
+
+class Predicate(predicate.Predicate):
+
+    def __init__(self, j_predicate):
+        self._j_predicate = j_predicate
+
+    def to_j_predicate(self):
+        return self._j_predicate
+
+
+class PredicateBuilder(predicate.PredicateBuilder):
+
+    def __init__(self, j_row_type):
+        self._field_names = j_row_type.getFieldNames()
+        self._j_row_type = j_row_type
+        self._j_predicate_builder = 
get_gateway().jvm.PredicateBuilder(j_row_type)
+
+    def _build(self, method: str, field: str, literals: Optional[List[Any]] = 
None):
+        error = ValueError(f'The field {field} is not in field list 
{self._field_names}.')
+        try:
+            index = self._field_names.index(field)
+            if index == -1:
+                raise error
+        except ValueError:
+            raise error
+
+        if literals is None:
+            literals = []
+
+        j_predicate = get_gateway().jvm.PredicationUtil.build(
+            self._j_row_type,
+            self._j_predicate_builder,
+            method,
+            index,
+            literals
+        )
+        return Predicate(j_predicate)
+
+    def equal(self, field: str, literal: Any) -> Predicate:
+        return self._build('equal', field, [literal])
+
+    def not_equal(self, field: str, literal: Any) -> Predicate:
+        return self._build('notEqual', field, [literal])
+
+    def less_than(self, field: str, literal: Any) -> Predicate:
+        return self._build('lessThan', field, [literal])
+
+    def less_or_equal(self, field: str, literal: Any) -> Predicate:
+        return self._build('lessOrEqual', field, [literal])
+
+    def greater_than(self, field: str, literal: Any) -> Predicate:
+        return self._build('greaterThan', field, [literal])
+
+    def greater_or_equal(self, field: str, literal: Any) -> Predicate:
+        return self._build('greaterOrEqual', field, [literal])
+
+    def is_null(self, field: str) -> Predicate:
+        return self._build('isNull', field)
+
+    def is_not_null(self, field: str) -> Predicate:
+        return self._build('isNotNull', field)
+
+    def startswith(self, field: str, pattern_literal: Any) -> Predicate:
+        return self._build('startsWith', field, [pattern_literal])
+
+    def endswith(self, field: str, pattern_literal: Any) -> Predicate:
+        return self._build('endsWith', field, [pattern_literal])
+
+    def contains(self, field: str, pattern_literal: Any) -> Predicate:
+        return self._build('contains', field, [pattern_literal])
+
+    def is_in(self, field: str, literals: List[Any]) -> Predicate:
+        return self._build('in', field, literals)
+
+    def is_not_in(self, field: str, literals: List[Any]) -> Predicate:
+        return self._build('notIn', field, literals)
+
+    def between(self, field: str, included_lower_bound: Any, 
included_upper_bound: Any) \
+            -> Predicate:
+        return self._build('between', field, [included_lower_bound, 
included_upper_bound])
+
+    def and_predicates(self, predicates: List[Predicate]) -> Predicate:
+        predicates = list(map(lambda p: p.to_j_predicate(), predicates))
+        return 
Predicate(get_gateway().jvm.PredicationUtil.buildAnd(predicates))
+
+    def or_predicates(self, predicates: List[Predicate]) -> Predicate:
+        predicates = list(map(lambda p: p.to_j_predicate(), predicates))
+        return Predicate(get_gateway().jvm.PredicationUtil.buildOr(predicates))
diff --git a/paimon_python_java/tests/test_preicates.py 
b/paimon_python_java/tests/test_preicates.py
new file mode 100644
index 0000000..7ee1a91
--- /dev/null
+++ b/paimon_python_java/tests/test_preicates.py
@@ -0,0 +1,394 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+import random
+import pandas as pd
+import pyarrow as pa
+
+from paimon_python_api import Schema
+from paimon_python_java import Catalog
+from paimon_python_java.tests import utils
+from setup_utils import java_setuputils
+
+
+def _check_filtered_result(read_builder, expected_df):
+    scan = read_builder.new_scan()
+    read = read_builder.new_read()
+    actual_df = read.to_pandas(scan.plan().splits())
+    pd.testing.assert_frame_equal(
+        actual_df.reset_index(drop=True), expected_df.reset_index(drop=True))
+
+
+# TODO: parquet has bug now
+def _random_format():
+    return random.choice(['avro', 'orc'])
+
+
+class PredicateTest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        java_setuputils.setup_java_bridge()
+        cls.hadoop_path = tempfile.mkdtemp()
+        utils.setup_hadoop_bundle_jar(cls.hadoop_path)
+        cls.warehouse = tempfile.mkdtemp()
+
+        catalog = Catalog.create({'warehouse': cls.warehouse})
+        catalog.create_database('default', False)
+
+        pa_schema = pa.schema([
+            ('f0', pa.int64()),
+            ('f1', pa.string()),
+        ])
+        catalog.create_table('default.test_append',
+                             Schema(pa_schema, options={'file.format': 
_random_format()}),
+                             False)
+        catalog.create_table('default.test_pk',
+                             Schema(pa_schema, primary_keys=['f0'],
+                                    options={'bucket': '1', 'file.format': 
_random_format()}),
+                             False)
+
+        df = pd.DataFrame({
+            'f0': [1, 2, 3, 4, 5],
+            'f1': ['abc', 'abbc', 'bc', 'd', None],
+        })
+
+        append_table = catalog.get_table('default.test_append')
+        write_builder = append_table.new_batch_write_builder()
+        write = write_builder.new_write()
+        commit = write_builder.new_commit()
+        write.write_pandas(df)
+        commit.commit(write.prepare_commit())
+        write.close()
+        commit.close()
+
+        pk_table = catalog.get_table('default.test_pk')
+        write_builder = pk_table.new_batch_write_builder()
+        write = write_builder.new_write()
+        commit = write_builder.new_commit()
+        write.write_pandas(df)
+        commit.commit(write.prepare_commit())
+        write.close()
+        commit.close()
+
+        cls.catalog = catalog
+        cls.df = df
+
+    @classmethod
+    def tearDownClass(cls):
+        java_setuputils.clean()
+        if os.path.exists(cls.hadoop_path):
+            shutil.rmtree(cls.hadoop_path)
+        if os.path.exists(cls.warehouse):
+            shutil.rmtree(cls.warehouse)
+
+    def testWrongFieldName(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        with self.assertRaises(ValueError) as e:
+            predicate_builder.equal('f2', 'a')
+        self.assertEqual(str(e.exception), "The field f2 is not in field list 
['f0', 'f1'].")
+
+    def testAppendWithDuplicate(self):
+        pa_schema = pa.schema([
+            ('f0', pa.int64()),
+            ('f1', pa.string()),
+        ])
+        self.catalog.create_table('default.test_append_with_duplicate', 
Schema(pa_schema), False)
+
+        df = pd.DataFrame({
+            'f0': [1, 1, 2, 2],
+            'f1': ['a', 'b', 'c', 'd'],
+        })
+
+        table = self.catalog.get_table('default.test_append_with_duplicate')
+        write_builder = table.new_batch_write_builder()
+        write = write_builder.new_write()
+        commit = write_builder.new_commit()
+        write.write_pandas(df)
+        commit.commit(write.prepare_commit())
+        write.close()
+        commit.close()
+
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+
+        predicate = predicate_builder.equal('f0', 1)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[0:1])
+
+        predicate = predicate_builder.equal('f0', 0)
+        read_builder = table.new_read_builder().with_filter(predicate)
+        scan = read_builder.new_scan()
+        read = read_builder.new_read()
+        actual_df = read.to_pandas(scan.plan().splits())
+        self.assertEqual(len(actual_df), 0)
+
+    def testAllFieldTypesWithEqual(self):
+        pa_schema = pa.schema([
+            # int
+            ('_tinyint', pa.int8()),
+            ('_smallint', pa.int16()),
+            ('_int', pa.int32()),
+            ('_bigint', pa.int64()),
+            # float
+            ('_float16', pa.float32()),  # NOTE: cannot write pa.float16() 
data into Paimon
+            ('_float32', pa.float32()),
+            ('_double', pa.float64()),
+            # string
+            ('_string', pa.string()),
+            # bool
+            ('_boolean', pa.bool_())
+        ])
+        self.catalog.create_table('default.test_all_field_types',
+                                  Schema(pa_schema, options={'file.format': 
_random_format()}),
+                                  False)
+        table = self.catalog.get_table('default.test_all_field_types')
+        write_builder = table.new_batch_write_builder()
+        write = write_builder.new_write()
+        commit = write_builder.new_commit()
+
+        df = pd.DataFrame({
+            '_tinyint': pd.Series([1, 2], dtype='int8'),
+            '_smallint': pd.Series([10, 20], dtype='int16'),
+            '_int': pd.Series([100, 200], dtype='int32'),
+            '_bigint': pd.Series([1000, 2000], dtype='int64'),
+            '_float16': pd.Series([1.0, 2.0], dtype='float16'),
+            '_float32': pd.Series([1.00, 2.00], dtype='float32'),
+            '_double': pd.Series([1.000, 2.000], dtype='double'),
+            '_string': pd.Series(['A', 'B'], dtype='object'),
+            '_boolean': [True, False]
+        })
+        record_batch = pa.RecordBatch.from_pandas(df, schema=pa_schema)
+        # prepare for assertion
+        df['_float16'] = df['_float16'].astype('float32')
+
+        write.write_arrow_batch(record_batch)
+        commit.commit(write.prepare_commit())
+        write.close()
+        commit.close()
+
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+
+        predicate = predicate_builder.equal('_tinyint', 1)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[0]])
+
+        predicate = predicate_builder.equal('_smallint', 20)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[1]])
+
+        predicate = predicate_builder.equal('_int', 100)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[0]])
+
+        predicate = predicate_builder.equal('_bigint', 2000)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[1]])
+
+        predicate = predicate_builder.equal('_float16', 1.0)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[0]])
+
+        predicate = predicate_builder.equal('_float32', 2.00)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[1]])
+
+        predicate = predicate_builder.equal('_double', 1.000)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[0]])
+
+        predicate = predicate_builder.equal('_string', 'B')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[1]])
+
+        predicate = predicate_builder.equal('_boolean', True)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[0]])
+
+    def testEqualPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.equal('f0', 1)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[0]])
+
+    def testNotEqualAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.not_equal('f0', 1)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[1:4])
+
+    def testNotEqualPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.not_equal('f0', 1)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[1:4])
+
+    def testLessThanAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.less_than('f0', 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:1])
+
+    def testLessThanPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.less_than('f0', 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:1])
+
+    def testLessOrEqualAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.less_or_equal('f0', 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
+
+    def testLessOrEqualPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.less_or_equal('f0', 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
+
+    def testGreaterThanAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.greater_than('f0', 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[3:4])
+
+    def testGreaterThanPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.greater_than('f0', 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[3:4])
+
+    def testGreaterOrEqualAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.greater_or_equal('f0', 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[2:4])
+
+    def testGreaterOrEqualPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.greater_or_equal('f0', 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[2:4])
+
+    def testIsNullAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.is_null('f1')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[4]])
+
+    def testIsNullPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.is_null('f1')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[4]])
+
+    def testIsNotNullAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.is_not_null('f1')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:3])
+
+    def testIsNotNullPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.is_not_null('f1')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:3])
+
+    def testStartswithAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.startswith('f1', 'ab')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:1])
+
+    def testStartswithPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.startswith('f1', 'ab')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:1])
+
+    def testEndswithAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.endswith('f1', 'bc')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
+
+    def testEndswithPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.endswith('f1', 'bc')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
+
+    def testContainsAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.contains('f1', 'bb')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[1]])
+
+    def testContainsPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.contains('f1', 'bb')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[1]])
+
+    def testIsInAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.is_in('f0', [1, 2])
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:1])
+
+    def testIsInPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.is_in('f1', ['abc', 'd'])
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[0, 3]])
+
+    def testIsNotInAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.is_not_in('f0', [1, 2])
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[2:4])
+
+    def testIsNotInPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.is_not_in('f1', ['abc', 'd'])
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[1:2])
+
+    def testBetweenAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.between('f0', 1, 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
+
+    def testBetweenPk(self):
+        table = self.catalog.get_table('default.test_pk')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.between('f0', 1, 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
+
+    def testAndPredicates(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate1 = predicate_builder.greater_than('f0', 1)
+        predicate2 = predicate_builder.startswith('f1', 'ab')
+        predicate = predicate_builder.and_predicates([predicate1, predicate2])
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[1]])
+
+    def testOrPredicates(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate1 = predicate_builder.greater_than('f0', 3)
+        predicate2 = predicate_builder.less_than('f0', 2)
+        predicate = predicate_builder.or_predicates([predicate1, predicate2])
+        _check_filtered_result(table.new_read_builder().with_filter(predicate),
+                               self.df.loc[[0, 3, 4]])
diff --git a/paimon_python_java/tests/utils.py 
b/paimon_python_java/tests/utils.py
index 2af2fe0..350f80e 100644
--- a/paimon_python_java/tests/utils.py
+++ b/paimon_python_java/tests/utils.py
@@ -23,6 +23,11 @@ from paimon_python_java.util import constants
 
 
 def setup_hadoop_bundle_jar(hadoop_dir):
+    if constants.PYPAIMON_HADOOP_CLASSPATH in os.environ:
+        file = os.environ[constants.PYPAIMON_HADOOP_CLASSPATH]
+        if os.path.isfile(file):
+            return
+
     url = 'https://repo.maven.apache.org/maven2/org/apache/flink/' \
           
'flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar'
 


Reply via email to