This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-python.git
The following commit(s) were added to refs/heads/main by this push:
new 5e7d468 Read interface support predicate (#22)
5e7d468 is described below
commit 5e7d4687e1106f4fb928fb9bbab1e50d15b99ff9
Author: yuzelin <[email protected]>
AuthorDate: Wed Nov 6 17:49:17 2024 +0800
Read interface support predicate (#22)
---
.github/workflows/paimon-python-checks.yml | 7 +
paimon_python_api/__init__.py | 5 +-
paimon_python_api/predicate.py | 95 +++++
paimon_python_api/read_builder.py | 13 +-
paimon_python_java/__init__.py | 7 +-
paimon_python_java/gateway_server.py | 2 +-
paimon_python_java/java_gateway.py | 1 +
.../org/apache/paimon/python/PredicationUtil.java | 111 ++++++
paimon_python_java/pypaimon.py | 103 +++++-
paimon_python_java/tests/test_preicates.py | 394 +++++++++++++++++++++
paimon_python_java/tests/utils.py | 5 +
11 files changed, 735 insertions(+), 8 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index e94820b..195783f 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -43,7 +43,14 @@ jobs:
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
+ - name: Set up hadoop dependency
+ run: |
+ mkdir -p ${{ github.workspace }}/temp
+ curl -L -o ${{ github.workspace }}/temp/bundled-hadoop.jar \
+
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
- name: Run lint-python.sh
+ env:
+ _PYPAIMON_HADOOP_CLASSPATH: ${{ github.workspace
}}/temp/bundled-hadoop.jar
run: |
chmod +x dev/lint-python.sh
./dev/lint-python.sh
diff --git a/paimon_python_api/__init__.py b/paimon_python_api/__init__.py
index 86090c9..44717bf 100644
--- a/paimon_python_api/__init__.py
+++ b/paimon_python_api/__init__.py
@@ -19,6 +19,7 @@
from .split import Split
from .table_read import TableRead
from .table_scan import TableScan, Plan
+from .predicate import Predicate, PredicateBuilder
from .read_builder import ReadBuilder
from .commit_message import CommitMessage
from .table_commit import BatchTableCommit
@@ -39,5 +40,7 @@ __all__ = [
'BatchWriteBuilder',
'Table',
'Schema',
- 'Catalog'
+ 'Catalog',
+ 'Predicate',
+ 'PredicateBuilder'
]
diff --git a/paimon_python_api/predicate.py b/paimon_python_api/predicate.py
new file mode 100644
index 0000000..46280d1
--- /dev/null
+++ b/paimon_python_api/predicate.py
@@ -0,0 +1,95 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+from abc import ABC, abstractmethod
+from typing import Any, List
+
+
+class Predicate(ABC):
+ """Predicate which evaluates to a boolean. Now it doesn't have
+ any methods because only paimon_python_java implement it and
+ the Java implementation convert it to Java object."""
+
+
+class PredicateBuilder(ABC):
+ """A utility class to create Predicate object for common filter
conditions."""
+
+ @abstractmethod
+ def equal(self, field: str, literal: Any) -> Predicate:
+ """field = literal"""
+
+ @abstractmethod
+ def not_equal(self, field: str, literal: Any) -> Predicate:
+ """field <> literal"""
+
+ @abstractmethod
+ def less_than(self, field: str, literal: Any) -> Predicate:
+ """field < literal"""
+
+ @abstractmethod
+ def less_or_equal(self, field: str, literal: Any) -> Predicate:
+ """field <= literal"""
+
+ @abstractmethod
+ def greater_than(self, field: str, literal: Any) -> Predicate:
+ """field > literal"""
+
+ @abstractmethod
+ def greater_or_equal(self, field: str, literal: Any) -> Predicate:
+ """field >= literal"""
+
+ @abstractmethod
+ def is_null(self, field: str) -> Predicate:
+ """field IS NULL"""
+
+ @abstractmethod
+ def is_not_null(self, field: str) -> Predicate:
+ """field IS NOT NULL"""
+
+ @abstractmethod
+ def startswith(self, field: str, pattern_literal: Any) -> Predicate:
+ """field.startswith"""
+
+ @abstractmethod
+ def endswith(self, field: str, pattern_literal: Any) -> Predicate:
+ """field.endswith()"""
+
+ @abstractmethod
+ def contains(self, field: str, pattern_literal: Any) -> Predicate:
+ """literal in field"""
+
+ @abstractmethod
+ def is_in(self, field: str, literals: List[Any]) -> Predicate:
+ """field IN literals"""
+
+ @abstractmethod
+ def is_not_in(self, field: str, literals: List[Any]) -> Predicate:
+ """field NOT IN literals"""
+
+ @abstractmethod
+ def between(self, field: str, included_lower_bound: Any,
included_upper_bound: Any) \
+ -> Predicate:
+ """field BETWEEN included_lower_bound AND included_upper_bound"""
+
+ @abstractmethod
+ def and_predicates(self, predicates: List[Predicate]) -> Predicate:
+ """predicate1 AND predicate2 AND ..."""
+
+ @abstractmethod
+ def or_predicates(self, predicates: List[Predicate]) -> Predicate:
+ """predicate1 OR predicate2 OR ..."""
diff --git a/paimon_python_api/read_builder.py
b/paimon_python_api/read_builder.py
index 94ec073..ad5e6d6 100644
--- a/paimon_python_api/read_builder.py
+++ b/paimon_python_api/read_builder.py
@@ -17,13 +17,20 @@
#################################################################################
from abc import ABC, abstractmethod
-from paimon_python_api import TableRead, TableScan
+from paimon_python_api import TableRead, TableScan, Predicate, PredicateBuilder
from typing import List
class ReadBuilder(ABC):
"""An interface for building the TableScan and TableRead."""
+ @abstractmethod
+ def with_filter(self, predicate: Predicate):
+ """
+ Push filters, will filter the data as much as possible,
+ but it is not guaranteed that it is a complete filter.
+ """
+
@abstractmethod
def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder':
"""Push nested projection."""
@@ -39,3 +46,7 @@ class ReadBuilder(ABC):
@abstractmethod
def new_read(self) -> TableRead:
"""Create a TableRead to read splits."""
+
+ @abstractmethod
+ def new_predicate_builder(self) -> PredicateBuilder:
+ """Create a builder for Predicate."""
diff --git a/paimon_python_java/__init__.py b/paimon_python_java/__init__.py
index 6e97d9e..9b0d002 100644
--- a/paimon_python_java/__init__.py
+++ b/paimon_python_java/__init__.py
@@ -18,7 +18,8 @@
from .util import constants
from .pypaimon import (Catalog, Table, ReadBuilder, TableScan, Plan, Split,
TableRead,
- BatchWriteBuilder, BatchTableWrite, CommitMessage,
BatchTableCommit)
+ BatchWriteBuilder, BatchTableWrite, CommitMessage,
BatchTableCommit,
+ Predicate, PredicateBuilder)
__all__ = [
'constants',
@@ -32,5 +33,7 @@ __all__ = [
'BatchWriteBuilder',
'BatchTableWrite',
'CommitMessage',
- 'BatchTableCommit'
+ 'BatchTableCommit',
+ 'Predicate',
+ 'PredicateBuilder'
]
diff --git a/paimon_python_java/gateway_server.py
b/paimon_python_java/gateway_server.py
index 7285e98..2061d59 100644
--- a/paimon_python_java/gateway_server.py
+++ b/paimon_python_java/gateway_server.py
@@ -103,7 +103,7 @@ def _get_hadoop_classpath(env):
return env[constants.PYPAIMON_HADOOP_CLASSPATH]
if 'HADOOP_CLASSPATH' in env:
- return None
+ return env['HADOOP_CLASSPATH']
else:
raise EnvironmentError(f"You haven't set
'{constants.PYPAIMON_HADOOP_CLASSPATH}', \
and 'HADOOP_CLASSPATH' is also not set. Ensure one of them is set.")
diff --git a/paimon_python_java/java_gateway.py
b/paimon_python_java/java_gateway.py
index f2b1621..3dabcfd 100644
--- a/paimon_python_java/java_gateway.py
+++ b/paimon_python_java/java_gateway.py
@@ -109,6 +109,7 @@ def import_paimon_view(gateway):
java_import(gateway.jvm, 'org.apache.paimon.types.*')
java_import(gateway.jvm, 'org.apache.paimon.python.*')
java_import(gateway.jvm, "org.apache.paimon.data.*")
+ java_import(gateway.jvm, "org.apache.paimon.predicate.PredicateBuilder")
class Watchdog(object):
diff --git
a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java
b/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java
new file mode 100644
index 0000000..a863dfd
--- /dev/null
+++
b/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.python;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** For building Predicate. */
+public class PredicationUtil {
+
+ public static Predicate build(
+ RowType rowType,
+ PredicateBuilder builder,
+ String method,
+ int index,
+ List<Object> literals) {
+ literals =
+ literals.stream()
+ .map(l -> convertJavaObject(rowType.getTypeAt(index),
l))
+ .collect(Collectors.toList());
+ switch (method) {
+ case "equal":
+ return builder.equal(index, literals.get(0));
+ case "notEqual":
+ return builder.notEqual(index, literals.get(0));
+ case "lessThan":
+ return builder.lessThan(index, literals.get(0));
+ case "lessOrEqual":
+ return builder.lessOrEqual(index, literals.get(0));
+ case "greaterThan":
+ return builder.greaterThan(index, literals.get(0));
+ case "greaterOrEqual":
+ return builder.greaterOrEqual(index, literals.get(0));
+ case "isNull":
+ return builder.isNull(index);
+ case "isNotNull":
+ return builder.isNotNull(index);
+ case "startsWith":
+ return builder.startsWith(index, literals.get(0));
+ case "endsWith":
+ return builder.endsWith(index, literals.get(0));
+ case "contains":
+ return builder.contains(index, literals.get(0));
+ case "in":
+ return builder.in(index, literals);
+ case "notIn":
+ return builder.notIn(index, literals);
+ case "between":
+ return builder.between(index, literals.get(0),
literals.get(1));
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown PredicateBuilder method " + method);
+ }
+ }
+
+ /** Some type is not convenient to transfer from Python to Java. */
+ private static Object convertJavaObject(DataType literalType, Object
literal) {
+ switch (literalType.getTypeRoot()) {
+ case BOOLEAN:
+ case DOUBLE:
+ case INTEGER:
+ return literal;
+ case CHAR:
+ case VARCHAR:
+ return BinaryString.fromString((String) literal);
+ case FLOAT:
+ return ((Number) literal).floatValue();
+ case TINYINT:
+ return ((Number) literal).byteValue();
+ case SMALLINT:
+ return ((Number) literal).shortValue();
+ case BIGINT:
+ return ((Number) literal).longValue();
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported predicate leaf type " +
literalType.getTypeRoot().name());
+ }
+ }
+
+ public static Predicate buildAnd(List<Predicate> predicates) {
+ // 'and' is keyword of Python
+ return PredicateBuilder.and(predicates);
+ }
+
+ public static Predicate buildOr(List<Predicate> predicates) {
+ // 'or' is keyword of Python
+ return PredicateBuilder.or(predicates);
+ }
+}
diff --git a/paimon_python_java/pypaimon.py b/paimon_python_java/pypaimon.py
index fcf0695..0d3101b 100644
--- a/paimon_python_java/pypaimon.py
+++ b/paimon_python_java/pypaimon.py
@@ -22,8 +22,9 @@ import pyarrow as pa
from paimon_python_java.java_gateway import get_gateway
from paimon_python_java.util import java_utils, constants
from paimon_python_api import (catalog, table, read_builder, table_scan,
split, table_read,
- write_builder, table_write, commit_message,
table_commit, Schema)
-from typing import List, Iterator, Optional
+ write_builder, table_write, commit_message,
table_commit, Schema,
+ predicate)
+from typing import List, Iterator, Optional, Any
class Catalog(catalog.Catalog):
@@ -85,6 +86,10 @@ class ReadBuilder(read_builder.ReadBuilder):
self._catalog_options = catalog_options
self._arrow_schema = arrow_schema
+ def with_filter(self, predicate: 'Predicate'):
+ self._j_read_builder.withFilter(predicate.to_j_predicate())
+ return self
+
def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder':
self._j_read_builder.withProjection(projection)
return self
@@ -98,9 +103,12 @@ class ReadBuilder(read_builder.ReadBuilder):
return TableScan(j_table_scan)
def new_read(self) -> 'TableRead':
- j_table_read = self._j_read_builder.newRead()
+ j_table_read = self._j_read_builder.newRead().executeFilter()
return TableRead(j_table_read, self._j_row_type,
self._catalog_options, self._arrow_schema)
+ def new_predicate_builder(self) -> 'PredicateBuilder':
+ return PredicateBuilder(self._j_row_type)
+
class TableScan(table_scan.TableScan):
@@ -257,3 +265,92 @@ class BatchTableCommit(table_commit.BatchTableCommit):
def close(self):
self._j_batch_table_commit.close()
+
+
+class Predicate(predicate.Predicate):
+
+ def __init__(self, j_predicate):
+ self._j_predicate = j_predicate
+
+ def to_j_predicate(self):
+ return self._j_predicate
+
+
+class PredicateBuilder(predicate.PredicateBuilder):
+
+ def __init__(self, j_row_type):
+ self._field_names = j_row_type.getFieldNames()
+ self._j_row_type = j_row_type
+ self._j_predicate_builder =
get_gateway().jvm.PredicateBuilder(j_row_type)
+
+ def _build(self, method: str, field: str, literals: Optional[List[Any]] =
None):
+ error = ValueError(f'The field {field} is not in field list
{self._field_names}.')
+ try:
+ index = self._field_names.index(field)
+ if index == -1:
+ raise error
+ except ValueError:
+ raise error
+
+ if literals is None:
+ literals = []
+
+ j_predicate = get_gateway().jvm.PredicationUtil.build(
+ self._j_row_type,
+ self._j_predicate_builder,
+ method,
+ index,
+ literals
+ )
+ return Predicate(j_predicate)
+
+ def equal(self, field: str, literal: Any) -> Predicate:
+ return self._build('equal', field, [literal])
+
+ def not_equal(self, field: str, literal: Any) -> Predicate:
+ return self._build('notEqual', field, [literal])
+
+ def less_than(self, field: str, literal: Any) -> Predicate:
+ return self._build('lessThan', field, [literal])
+
+ def less_or_equal(self, field: str, literal: Any) -> Predicate:
+ return self._build('lessOrEqual', field, [literal])
+
+ def greater_than(self, field: str, literal: Any) -> Predicate:
+ return self._build('greaterThan', field, [literal])
+
+ def greater_or_equal(self, field: str, literal: Any) -> Predicate:
+ return self._build('greaterOrEqual', field, [literal])
+
+ def is_null(self, field: str) -> Predicate:
+ return self._build('isNull', field)
+
+ def is_not_null(self, field: str) -> Predicate:
+ return self._build('isNotNull', field)
+
+ def startswith(self, field: str, pattern_literal: Any) -> Predicate:
+ return self._build('startsWith', field, [pattern_literal])
+
+ def endswith(self, field: str, pattern_literal: Any) -> Predicate:
+ return self._build('endsWith', field, [pattern_literal])
+
+ def contains(self, field: str, pattern_literal: Any) -> Predicate:
+ return self._build('contains', field, [pattern_literal])
+
+ def is_in(self, field: str, literals: List[Any]) -> Predicate:
+ return self._build('in', field, literals)
+
+ def is_not_in(self, field: str, literals: List[Any]) -> Predicate:
+ return self._build('notIn', field, literals)
+
+ def between(self, field: str, included_lower_bound: Any,
included_upper_bound: Any) \
+ -> Predicate:
+ return self._build('between', field, [included_lower_bound,
included_upper_bound])
+
+ def and_predicates(self, predicates: List[Predicate]) -> Predicate:
+ predicates = list(map(lambda p: p.to_j_predicate(), predicates))
+ return
Predicate(get_gateway().jvm.PredicationUtil.buildAnd(predicates))
+
+ def or_predicates(self, predicates: List[Predicate]) -> Predicate:
+ predicates = list(map(lambda p: p.to_j_predicate(), predicates))
+ return Predicate(get_gateway().jvm.PredicationUtil.buildOr(predicates))
diff --git a/paimon_python_java/tests/test_preicates.py
b/paimon_python_java/tests/test_preicates.py
new file mode 100644
index 0000000..7ee1a91
--- /dev/null
+++ b/paimon_python_java/tests/test_preicates.py
@@ -0,0 +1,394 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+import random
+import pandas as pd
+import pyarrow as pa
+
+from paimon_python_api import Schema
+from paimon_python_java import Catalog
+from paimon_python_java.tests import utils
+from setup_utils import java_setuputils
+
+
+def _check_filtered_result(read_builder, expected_df):
+ scan = read_builder.new_scan()
+ read = read_builder.new_read()
+ actual_df = read.to_pandas(scan.plan().splits())
+ pd.testing.assert_frame_equal(
+ actual_df.reset_index(drop=True), expected_df.reset_index(drop=True))
+
+
+# TODO: parquet has bug now
+def _random_format():
+ return random.choice(['avro', 'orc'])
+
+
+class PredicateTest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ java_setuputils.setup_java_bridge()
+ cls.hadoop_path = tempfile.mkdtemp()
+ utils.setup_hadoop_bundle_jar(cls.hadoop_path)
+ cls.warehouse = tempfile.mkdtemp()
+
+ catalog = Catalog.create({'warehouse': cls.warehouse})
+ catalog.create_database('default', False)
+
+ pa_schema = pa.schema([
+ ('f0', pa.int64()),
+ ('f1', pa.string()),
+ ])
+ catalog.create_table('default.test_append',
+ Schema(pa_schema, options={'file.format':
_random_format()}),
+ False)
+ catalog.create_table('default.test_pk',
+ Schema(pa_schema, primary_keys=['f0'],
+ options={'bucket': '1', 'file.format':
_random_format()}),
+ False)
+
+ df = pd.DataFrame({
+ 'f0': [1, 2, 3, 4, 5],
+ 'f1': ['abc', 'abbc', 'bc', 'd', None],
+ })
+
+ append_table = catalog.get_table('default.test_append')
+ write_builder = append_table.new_batch_write_builder()
+ write = write_builder.new_write()
+ commit = write_builder.new_commit()
+ write.write_pandas(df)
+ commit.commit(write.prepare_commit())
+ write.close()
+ commit.close()
+
+ pk_table = catalog.get_table('default.test_pk')
+ write_builder = pk_table.new_batch_write_builder()
+ write = write_builder.new_write()
+ commit = write_builder.new_commit()
+ write.write_pandas(df)
+ commit.commit(write.prepare_commit())
+ write.close()
+ commit.close()
+
+ cls.catalog = catalog
+ cls.df = df
+
+ @classmethod
+ def tearDownClass(cls):
+ java_setuputils.clean()
+ if os.path.exists(cls.hadoop_path):
+ shutil.rmtree(cls.hadoop_path)
+ if os.path.exists(cls.warehouse):
+ shutil.rmtree(cls.warehouse)
+
+ def testWrongFieldName(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ with self.assertRaises(ValueError) as e:
+ predicate_builder.equal('f2', 'a')
+ self.assertEqual(str(e.exception), "The field f2 is not in field list
['f0', 'f1'].")
+
+ def testAppendWithDuplicate(self):
+ pa_schema = pa.schema([
+ ('f0', pa.int64()),
+ ('f1', pa.string()),
+ ])
+ self.catalog.create_table('default.test_append_with_duplicate',
Schema(pa_schema), False)
+
+ df = pd.DataFrame({
+ 'f0': [1, 1, 2, 2],
+ 'f1': ['a', 'b', 'c', 'd'],
+ })
+
+ table = self.catalog.get_table('default.test_append_with_duplicate')
+ write_builder = table.new_batch_write_builder()
+ write = write_builder.new_write()
+ commit = write_builder.new_commit()
+ write.write_pandas(df)
+ commit.commit(write.prepare_commit())
+ write.close()
+ commit.close()
+
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+
+ predicate = predicate_builder.equal('f0', 1)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[0:1])
+
+ predicate = predicate_builder.equal('f0', 0)
+ read_builder = table.new_read_builder().with_filter(predicate)
+ scan = read_builder.new_scan()
+ read = read_builder.new_read()
+ actual_df = read.to_pandas(scan.plan().splits())
+ self.assertEqual(len(actual_df), 0)
+
+ def testAllFieldTypesWithEqual(self):
+ pa_schema = pa.schema([
+ # int
+ ('_tinyint', pa.int8()),
+ ('_smallint', pa.int16()),
+ ('_int', pa.int32()),
+ ('_bigint', pa.int64()),
+ # float
+ ('_float16', pa.float32()), # NOTE: cannot write pa.float16()
data into Paimon
+ ('_float32', pa.float32()),
+ ('_double', pa.float64()),
+ # string
+ ('_string', pa.string()),
+ # bool
+ ('_boolean', pa.bool_())
+ ])
+ self.catalog.create_table('default.test_all_field_types',
+ Schema(pa_schema, options={'file.format':
_random_format()}),
+ False)
+ table = self.catalog.get_table('default.test_all_field_types')
+ write_builder = table.new_batch_write_builder()
+ write = write_builder.new_write()
+ commit = write_builder.new_commit()
+
+ df = pd.DataFrame({
+ '_tinyint': pd.Series([1, 2], dtype='int8'),
+ '_smallint': pd.Series([10, 20], dtype='int16'),
+ '_int': pd.Series([100, 200], dtype='int32'),
+ '_bigint': pd.Series([1000, 2000], dtype='int64'),
+ '_float16': pd.Series([1.0, 2.0], dtype='float16'),
+ '_float32': pd.Series([1.00, 2.00], dtype='float32'),
+ '_double': pd.Series([1.000, 2.000], dtype='double'),
+ '_string': pd.Series(['A', 'B'], dtype='object'),
+ '_boolean': [True, False]
+ })
+ record_batch = pa.RecordBatch.from_pandas(df, schema=pa_schema)
+ # prepare for assertion
+ df['_float16'] = df['_float16'].astype('float32')
+
+ write.write_arrow_batch(record_batch)
+ commit.commit(write.prepare_commit())
+ write.close()
+ commit.close()
+
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+
+ predicate = predicate_builder.equal('_tinyint', 1)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[0]])
+
+ predicate = predicate_builder.equal('_smallint', 20)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[1]])
+
+ predicate = predicate_builder.equal('_int', 100)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[0]])
+
+ predicate = predicate_builder.equal('_bigint', 2000)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[1]])
+
+ predicate = predicate_builder.equal('_float16', 1.0)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[0]])
+
+ predicate = predicate_builder.equal('_float32', 2.00)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[1]])
+
+ predicate = predicate_builder.equal('_double', 1.000)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[0]])
+
+ predicate = predicate_builder.equal('_string', 'B')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[1]])
+
+ predicate = predicate_builder.equal('_boolean', True)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[0]])
+
+ def testEqualPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.equal('f0', 1)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[[0]])
+
+ def testNotEqualAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.not_equal('f0', 1)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[1:4])
+
+ def testNotEqualPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.not_equal('f0', 1)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[1:4])
+
+ def testLessThanAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.less_than('f0', 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:1])
+
+ def testLessThanPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.less_than('f0', 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:1])
+
+ def testLessOrEqualAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.less_or_equal('f0', 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:2])
+
+ def testLessOrEqualPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.less_or_equal('f0', 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:2])
+
+ def testGreaterThanAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.greater_than('f0', 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[3:4])
+
+ def testGreaterThanPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.greater_than('f0', 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[3:4])
+
+ def testGreaterOrEqualAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.greater_or_equal('f0', 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[2:4])
+
+ def testGreaterOrEqualPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.greater_or_equal('f0', 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[2:4])
+
+ def testIsNullAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.is_null('f1')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[[4]])
+
+ def testIsNullPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.is_null('f1')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[[4]])
+
+ def testIsNotNullAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.is_not_null('f1')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:3])
+
+ def testIsNotNullPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.is_not_null('f1')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:3])
+
+ def testStartswithAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.startswith('f1', 'ab')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:1])
+
+ def testStartswithPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.startswith('f1', 'ab')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:1])
+
+ def testEndswithAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.endswith('f1', 'bc')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:2])
+
+ def testEndswithPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.endswith('f1', 'bc')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:2])
+
+ def testContainsAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.contains('f1', 'bb')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[[1]])
+
+ def testContainsPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.contains('f1', 'bb')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[[1]])
+
+ def testIsInAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.is_in('f0', [1, 2])
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:1])
+
+ def testIsInPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.is_in('f1', ['abc', 'd'])
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[[0, 3]])
+
+ def testIsNotInAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.is_not_in('f0', [1, 2])
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[2:4])
+
+ def testIsNotInPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.is_not_in('f1', ['abc', 'd'])
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[1:2])
+
+ def testBetweenAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.between('f0', 1, 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:2])
+
+ def testBetweenPk(self):
+ table = self.catalog.get_table('default.test_pk')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.between('f0', 1, 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:2])
+
+ def testAndPredicates(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate1 = predicate_builder.greater_than('f0', 1)
+ predicate2 = predicate_builder.startswith('f1', 'ab')
+ predicate = predicate_builder.and_predicates([predicate1, predicate2])
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[[1]])
+
+ def testOrPredicates(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate1 = predicate_builder.greater_than('f0', 3)
+ predicate2 = predicate_builder.less_than('f0', 2)
+ predicate = predicate_builder.or_predicates([predicate1, predicate2])
+ _check_filtered_result(table.new_read_builder().with_filter(predicate),
+ self.df.loc[[0, 3, 4]])
diff --git a/paimon_python_java/tests/utils.py
b/paimon_python_java/tests/utils.py
index 2af2fe0..350f80e 100644
--- a/paimon_python_java/tests/utils.py
+++ b/paimon_python_java/tests/utils.py
@@ -23,6 +23,11 @@ from paimon_python_java.util import constants
def setup_hadoop_bundle_jar(hadoop_dir):
+ if constants.PYPAIMON_HADOOP_CLASSPATH in os.environ:
+ file = os.environ[constants.PYPAIMON_HADOOP_CLASSPATH]
+ if os.path.isfile(file):
+ return
+
url = 'https://repo.maven.apache.org/maven2/org/apache/flink/' \
'flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar'