This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 93b8bcd308 [python] Refactor Test to standardize naming and structure
(#6252)
93b8bcd308 is described below
commit 93b8bcd308b41e71768f8359034b2828f4e213f6
Author: umi <[email protected]>
AuthorDate: Mon Sep 15 12:01:57 2025 +0800
[python] Refactor Test to standardize naming and structure (#6252)
---
...e_store_commit.py => file_store_commit_test.py} | 0
.../pypaimon/tests/filesystem_catalog_test.py | 2 +-
.../pypaimon/tests/predicate_push_down_test.py | 151 ---------------
paimon-python/pypaimon/tests/predicates_test.py | 100 ++++++++++
paimon-python/pypaimon/tests/pvfs_test.py | 4 +-
.../pypaimon/tests/py36/ao_predicate_test.py | 30 +--
...ad_write_test.py => rest_ao_read_write_test.py} | 23 +--
.../pypaimon/tests/reader_append_only_test.py | 16 +-
.../{reader_basic_test.py => reader_base_test.py} | 208 ++++++++++++++++-----
.../pypaimon/tests/reader_primary_key_test.py | 12 +-
paimon-python/pypaimon/tests/rest/__init__.py | 17 ++
.../pypaimon/tests/{ => rest}/api_test.py | 4 +-
.../rest_base_test.py} | 4 +-
.../rest_catalog_commit_snapshot_test.py} | 0
.../rest_read_write_test.py} | 36 ++--
.../pypaimon/tests/{ => rest}/rest_server.py | 0
.../rest_simple_test.py} | 4 +-
paimon-python/pypaimon/tests/schema_test.py | 79 --------
paimon-python/pypaimon/tests/writer_test.py | 94 ----------
19 files changed, 343 insertions(+), 441 deletions(-)
diff --git a/paimon-python/pypaimon/tests/test_file_store_commit.py
b/paimon-python/pypaimon/tests/file_store_commit_test.py
similarity index 100%
rename from paimon-python/pypaimon/tests/test_file_store_commit.py
rename to paimon-python/pypaimon/tests/file_store_commit_test.py
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
index 0e9900efb4..d6b9433cba 100644
--- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
@@ -29,7 +29,7 @@ from pypaimon import Schema
from pypaimon.table.file_store_table import FileStoreTable
-class FileSystemCatalogTestCase(unittest.TestCase):
+class FileSystemCatalogTest(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp(prefix="unittest_")
diff --git a/paimon-python/pypaimon/tests/predicate_push_down_test.py
b/paimon-python/pypaimon/tests/predicate_push_down_test.py
deleted file mode 100644
index e809295d57..0000000000
--- a/paimon-python/pypaimon/tests/predicate_push_down_test.py
+++ /dev/null
@@ -1,151 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-import os
-import shutil
-import tempfile
-import unittest
-
-import pyarrow as pa
-
-from pypaimon import CatalogFactory
-from pypaimon.common.predicate_builder import PredicateBuilder
-from pypaimon import Schema
-
-
-class PredicatePushDownTest(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- cls.tempdir = tempfile.mkdtemp()
- cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
- cls.catalog = CatalogFactory.create({
- 'warehouse': cls.warehouse
- })
- cls.catalog.create_database('default', False)
-
- cls.pa_schema = pa.schema([
- pa.field('key1', pa.int32(), nullable=False),
- pa.field('key2', pa.string(), nullable=False),
- ('behavior', pa.string()),
- pa.field('dt1', pa.string(), nullable=False),
- pa.field('dt2', pa.int32(), nullable=False)
- ])
- cls.expected = pa.Table.from_pydict({
- 'key1': [1, 2, 3, 4, 5, 7, 8],
- 'key2': ['h', 'g', 'f', 'e', 'd', 'b', 'a'],
- 'behavior': ['a', 'b-new', 'c', None, 'e', 'g', 'h'],
- 'dt1': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'],
- 'dt2': [2, 2, 1, 2, 2, 1, 2],
- }, schema=cls.pa_schema)
-
- @classmethod
- def tearDownClass(cls):
- shutil.rmtree(cls.tempdir, ignore_errors=True)
-
- def testPkReaderWithFilter(self):
- schema = Schema.from_pyarrow_schema(self.pa_schema,
- partition_keys=['dt1', 'dt2'],
- primary_keys=['key1', 'key2'],
- options={'bucket': '1'})
- self.catalog.create_table('default.test_pk_filter', schema, False)
- table = self.catalog.get_table('default.test_pk_filter')
-
- write_builder = table.new_batch_write_builder()
- table_write = write_builder.new_write()
- table_commit = write_builder.new_commit()
- data1 = {
- 'key1': [1, 2, 3, 4],
- 'key2': ['h', 'g', 'f', 'e'],
- 'behavior': ['a', 'b', 'c', None],
- 'dt1': ['p1', 'p1', 'p2', 'p1'],
- 'dt2': [2, 2, 1, 2],
- }
- pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema)
- table_write.write_arrow(pa_table)
- table_commit.commit(table_write.prepare_commit())
- table_write.close()
- table_commit.close()
-
- table_write = write_builder.new_write()
- table_commit = write_builder.new_commit()
- data1 = {
- 'key1': [5, 2, 7, 8],
- 'key2': ['d', 'g', 'b', 'a'],
- 'behavior': ['e', 'b-new', 'g', 'h'],
- 'dt1': ['p2', 'p1', 'p1', 'p2'],
- 'dt2': [2, 2, 1, 2]
- }
- pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema)
- table_write.write_arrow(pa_table)
- table_commit.commit(table_write.prepare_commit())
- table_write.close()
- table_commit.close()
-
- # test filter by partition
- predicate_builder: PredicateBuilder =
table.new_read_builder().new_predicate_builder()
- p1 = predicate_builder.startswith('dt1', "p1")
- p2 = predicate_builder.is_in('dt1', ["p2"])
- p3 = predicate_builder.or_predicates([p1, p2])
- p4 = predicate_builder.equal('dt2', 2)
- g1 = predicate_builder.and_predicates([p3, p4])
- # (dt1 startswith 'p1' or dt1 is_in ["p2"]) and dt2 == 2
- read_builder = table.new_read_builder().with_filter(g1)
- splits = read_builder.new_scan().plan().splits()
- self.assertEqual(len(splits), 2)
- self.assertEqual(splits[0].partition.to_dict()["dt2"], 2)
- self.assertEqual(splits[1].partition.to_dict()["dt2"], 2)
-
- # test filter by stats
- predicate_builder: PredicateBuilder =
table.new_read_builder().new_predicate_builder()
- p1 = predicate_builder.equal('key1', 7)
- p2 = predicate_builder.is_in('key2', ["e", "f"])
- p3 = predicate_builder.or_predicates([p1, p2])
- p4 = predicate_builder.greater_than('key1', 3)
- g1 = predicate_builder.and_predicates([p3, p4])
- # (key1 == 7 or key2 is_in ["e", "f"]) and key1 > 3
- read_builder = table.new_read_builder().with_filter(g1)
- splits = read_builder.new_scan().plan().splits()
- # initial splits meta:
- # p1, 2 -> 2g, 2g; 1e, 4h
- # p2, 1 -> 3f, 3f
- # p2, 2 -> 5a, 8d
- # p1, 1 -> 7b, 7b
- self.assertEqual(len(splits), 3)
- # expect to filter out `p1, 2 -> 2g, 2g` and `p2, 1 -> 3f, 3f`
- count = 0
- for split in splits:
- if split.partition.values == ["p1", 2]:
- count += 1
- self.assertEqual(len(split.files), 1)
- min_values = split.files[0].value_stats.min_values.to_dict()
- max_values = split.files[0].value_stats.max_values.to_dict()
- self.assertTrue(min_values["key1"] == 1 and min_values["key2"]
== "e"
- and max_values["key1"] == 4 and
max_values["key2"] == "h")
- elif split.partition.values == ["p2", 2]:
- count += 1
- min_values = split.files[0].value_stats.min_values.to_dict()
- max_values = split.files[0].value_stats.max_values.to_dict()
- self.assertTrue(min_values["key1"] == 5 and min_values["key2"]
== "a"
- and max_values["key1"] == 8 and
max_values["key2"] == "d")
- elif split.partition.values == ["p1", 1]:
- count += 1
- min_values = split.files[0].value_stats.min_values.to_dict()
- max_values = split.files[0].value_stats.max_values.to_dict()
- self.assertTrue(min_values["key1"] == max_values["key1"] == 7
- and max_values["key2"] == max_values["key2"]
== "b")
- self.assertEqual(count, 3)
diff --git a/paimon-python/pypaimon/tests/predicates_test.py
b/paimon-python/pypaimon/tests/predicates_test.py
index ca3caa5276..0ddebc3451 100644
--- a/paimon-python/pypaimon/tests/predicates_test.py
+++ b/paimon-python/pypaimon/tests/predicates_test.py
@@ -372,3 +372,103 @@ class PredicateTest(unittest.TestCase):
predicate = predicate_builder.or_predicates([predicate1, predicate2])
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[[0, 3, 4]])
+
+ def testPkReaderWithFilter(self):
+ pa_schema = pa.schema([
+ pa.field('key1', pa.int32(), nullable=False),
+ pa.field('key2', pa.string(), nullable=False),
+ ('behavior', pa.string()),
+ pa.field('dt1', pa.string(), nullable=False),
+ pa.field('dt2', pa.int32(), nullable=False)
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema,
+ partition_keys=['dt1', 'dt2'],
+ primary_keys=['key1', 'key2'],
+ options={'bucket': '1'})
+ self.catalog.create_table('default.test_pk_filter', schema, False)
+ table = self.catalog.get_table('default.test_pk_filter')
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data1 = {
+ 'key1': [1, 2, 3, 4],
+ 'key2': ['h', 'g', 'f', 'e'],
+ 'behavior': ['a', 'b', 'c', None],
+ 'dt1': ['p1', 'p1', 'p2', 'p1'],
+ 'dt2': [2, 2, 1, 2],
+ }
+ pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data1 = {
+ 'key1': [5, 2, 7, 8],
+ 'key2': ['d', 'g', 'b', 'a'],
+ 'behavior': ['e', 'b-new', 'g', 'h'],
+ 'dt1': ['p2', 'p1', 'p1', 'p2'],
+ 'dt2': [2, 2, 1, 2]
+ }
+ pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # test filter by partition
+ predicate_builder: PredicateBuilder =
table.new_read_builder().new_predicate_builder()
+ p1 = predicate_builder.startswith('dt1', "p1")
+ p2 = predicate_builder.is_in('dt1', ["p2"])
+ p3 = predicate_builder.or_predicates([p1, p2])
+ p4 = predicate_builder.equal('dt2', 2)
+ g1 = predicate_builder.and_predicates([p3, p4])
+ # (dt1 startswith 'p1' or dt1 is_in ["p2"]) and dt2 == 2
+ read_builder = table.new_read_builder().with_filter(g1)
+ splits = read_builder.new_scan().plan().splits()
+ self.assertEqual(len(splits), 2)
+ self.assertEqual(splits[0].partition.to_dict()["dt2"], 2)
+ self.assertEqual(splits[1].partition.to_dict()["dt2"], 2)
+
+ # test filter by stats
+ predicate_builder: PredicateBuilder =
table.new_read_builder().new_predicate_builder()
+ p1 = predicate_builder.equal('key1', 7)
+ p2 = predicate_builder.is_in('key2', ["e", "f"])
+ p3 = predicate_builder.or_predicates([p1, p2])
+ p4 = predicate_builder.greater_than('key1', 3)
+ g1 = predicate_builder.and_predicates([p3, p4])
+ # (key1 == 7 or key2 is_in ["e", "f"]) and key1 > 3
+ read_builder = table.new_read_builder().with_filter(g1)
+ splits = read_builder.new_scan().plan().splits()
+ # initial splits meta:
+ # p1, 2 -> 2g, 2g; 1e, 4h
+ # p2, 1 -> 3f, 3f
+ # p2, 2 -> 5a, 8d
+ # p1, 1 -> 7b, 7b
+ self.assertEqual(len(splits), 3)
+ # expect to filter out `p1, 2 -> 2g, 2g` and `p2, 1 -> 3f, 3f`
+ count = 0
+ for split in splits:
+ if split.partition.values == ["p1", 2]:
+ count += 1
+ self.assertEqual(len(split.files), 1)
+ min_values = split.files[0].value_stats.min_values.to_dict()
+ max_values = split.files[0].value_stats.max_values.to_dict()
+ self.assertTrue(min_values["key1"] == 1 and min_values["key2"]
== "e"
+ and max_values["key1"] == 4 and
max_values["key2"] == "h")
+ elif split.partition.values == ["p2", 2]:
+ count += 1
+ min_values = split.files[0].value_stats.min_values.to_dict()
+ max_values = split.files[0].value_stats.max_values.to_dict()
+ self.assertTrue(min_values["key1"] == 5 and min_values["key2"]
== "a"
+ and max_values["key1"] == 8 and
max_values["key2"] == "d")
+ elif split.partition.values == ["p1", 1]:
+ count += 1
+ min_values = split.files[0].value_stats.min_values.to_dict()
+ max_values = split.files[0].value_stats.max_values.to_dict()
+ self.assertTrue(min_values["key1"] == max_values["key1"] == 7
+ and max_values["key2"] == max_values["key2"]
== "b")
+ self.assertEqual(count, 3)
diff --git a/paimon-python/pypaimon/tests/pvfs_test.py
b/paimon-python/pypaimon/tests/pvfs_test.py
index 4aa17706ee..89c96a6228 100644
--- a/paimon-python/pypaimon/tests/pvfs_test.py
+++ b/paimon-python/pypaimon/tests/pvfs_test.py
@@ -29,10 +29,10 @@ from pypaimon.api.auth import BearTokenAuthProvider
from pypaimon.catalog.rest.table_metadata import TableMetadata
from pypaimon.schema.data_types import AtomicType, DataField
from pypaimon.schema.table_schema import TableSchema
-from pypaimon.tests.api_test import RESTCatalogServer
+from pypaimon.tests.rest.api_test import RESTCatalogServer
-class PVFSTestCase(unittest.TestCase):
+class PVFSTest(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp(prefix="unittest_")
diff --git a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
index 5e7435a360..92a3e9601a 100644
--- a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
@@ -58,14 +58,14 @@ class PredicatePy36Test(unittest.TestCase):
cls.df = df
- def testWrongFieldName(self):
+ def test_wrong_field_name(self):
table = self.catalog.get_table('default.test_append')
predicate_builder = table.new_read_builder().new_predicate_builder()
with self.assertRaises(ValueError) as e:
predicate_builder.equal('f2', 'a')
self.assertEqual(str(e.exception), "The field f2 is not in field list
['f0', 'f1'].")
- def testAppendWithDuplicate(self):
+ def test_append_with_duplicate(self):
pa_schema = pa.schema([
('f0', pa.int64()),
('f1', pa.string()),
@@ -98,7 +98,7 @@ class PredicatePy36Test(unittest.TestCase):
actual_df = read.to_pandas(scan.plan().splits())
self.assertEqual(len(actual_df), 0)
- def testAllFieldTypesWithEqual(self):
+ def test_all_field_types_with_equal(self):
pa_schema = pa.schema([
# int
('_tinyint', pa.int8()),
@@ -169,67 +169,67 @@ class PredicatePy36Test(unittest.TestCase):
predicate = predicate_builder.equal('_boolean', True)
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[0]])
- def testNotEqualAppend(self):
+ def test_not_equal_append(self):
table = self.catalog.get_table('default.test_append')
predicate_builder = table.new_read_builder().new_predicate_builder()
predicate = predicate_builder.not_equal('f0', 1)
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[1:4])
- def testLessThanAppend(self):
+ def test_less_than_append(self):
table = self.catalog.get_table('default.test_append')
predicate_builder = table.new_read_builder().new_predicate_builder()
predicate = predicate_builder.less_than('f0', 3)
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:1])
- def testLessOrEqualAppend(self):
+ def test_less_or_equal_append(self):
table = self.catalog.get_table('default.test_append')
predicate_builder = table.new_read_builder().new_predicate_builder()
predicate = predicate_builder.less_or_equal('f0', 3)
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:2])
- def testGreaterThanAppend(self):
+ def test_greater_than_append(self):
table = self.catalog.get_table('default.test_append')
predicate_builder = table.new_read_builder().new_predicate_builder()
predicate = predicate_builder.greater_than('f0', 3)
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[3:4])
- def testGreaterOrEqualAppend(self):
+ def test_greater_or_equal_append(self):
table = self.catalog.get_table('default.test_append')
predicate_builder = table.new_read_builder().new_predicate_builder()
predicate = predicate_builder.greater_or_equal('f0', 3)
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[2:4])
- def testIsNullAppend(self):
+ def test_is_null_append(self):
table = self.catalog.get_table('default.test_append')
predicate_builder = table.new_read_builder().new_predicate_builder()
predicate = predicate_builder.is_null('f1')
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[[4]])
- def testIsNotNullAppend(self):
+ def test_is_not_null_append(self):
table = self.catalog.get_table('default.test_append')
predicate_builder = table.new_read_builder().new_predicate_builder()
predicate = predicate_builder.is_not_null('f1')
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:3])
- def testIsInAppend(self):
+ def test_is_in_append(self):
table = self.catalog.get_table('default.test_append')
predicate_builder = table.new_read_builder().new_predicate_builder()
predicate = predicate_builder.is_in('f0', [1, 2])
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:1])
- def testIsNotInAppend(self):
+ def test_is_not_in_append(self):
table = self.catalog.get_table('default.test_append')
predicate_builder = table.new_read_builder().new_predicate_builder()
predicate = predicate_builder.is_not_in('f0', [1, 2])
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[2:4])
- def testBetweenAppend(self):
+ def test_between_append(self):
table = self.catalog.get_table('default.test_append')
predicate_builder = table.new_read_builder().new_predicate_builder()
predicate = predicate_builder.between('f0', 1, 3)
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:2])
- def testAndPredicates(self):
+ def test_and_predicates(self):
table = self.catalog.get_table('default.test_append')
predicate_builder = table.new_read_builder().new_predicate_builder()
predicate1 = predicate_builder.greater_than('f0', 1)
@@ -237,7 +237,7 @@ class PredicatePy36Test(unittest.TestCase):
predicate = predicate_builder.and_predicates([predicate1, predicate2])
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[1:2])
- def testOrPredicates(self):
+ def test_or_predicates(self):
table = self.catalog.get_table('default.test_append')
predicate_builder = table.new_read_builder().new_predicate_builder()
predicate1 = predicate_builder.greater_than('f0', 3)
diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
similarity index 98%
rename from paimon-python/pypaimon/tests/py36/ao_read_write_test.py
rename to paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index fcd05ee6cf..20e6a2c2d8 100644
--- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -38,11 +38,12 @@ from pypaimon import Schema
from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer,
GenericRowDeserializer
from pypaimon.table.row.row_kind import RowKind
from pypaimon.tests.py36.pyarrow_compat import table_sort_by
-from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest
+from pypaimon.tests.rest.rest_base_test import RESTBaseTest
+
from pypaimon.write.file_store_commit import FileStoreCommit
-class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
+class RESTReadWritePy36Test(RESTBaseTest):
def test_overwrite(self):
simple_pa_schema = pa.schema([
@@ -304,7 +305,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
self.assertEqual(south_stat.file_count, -1)
self.assertEqual(south_stat.file_size_in_bytes, -750)
- def testParquetAppendOnlyReader(self):
+ def test_parquet_append_only_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_parquet',
schema, False)
table = self.rest_catalog.get_table('default.test_append_only_parquet')
@@ -314,7 +315,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
self.assertEqual(actual, self.expected)
- def testOrcAppendOnlyReader(self):
+ def test_orc_append_only_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'orc'})
self.rest_catalog.create_table('default.test_append_only_orc', schema,
False)
table = self.rest_catalog.get_table('default.test_append_only_orc')
@@ -324,7 +325,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
self.assertEqual(actual, self.expected)
- def testAvroAppendOnlyReader(self):
+ def test_avro_append_only_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'avro'})
self.rest_catalog.create_table('default.test_append_only_avro',
schema, False)
table = self.rest_catalog.get_table('default.test_append_only_avro')
@@ -444,7 +445,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
result = table_read.to_pandas(table_scan.plan().splits())
self.assertEqual(result.to_dict(), test_df.to_dict())
- def testAppendOnlyReaderWithFilter(self):
+ def test_append_only_reader_with_filter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_filter',
schema, False)
table = self.rest_catalog.get_table('default.test_append_only_filter')
@@ -493,7 +494,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
])
self.assertEqual(table_sort_by(actual, 'user_id'), expected)
- def testAppendOnlyReaderWithProjection(self):
+ def test_append_only_reader_with_projection(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_projection',
schema, False)
table =
self.rest_catalog.get_table('default.test_append_only_projection')
@@ -504,7 +505,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
expected = self.expected.select(['dt', 'user_id'])
self.assertEqual(actual, expected)
- def testAvroAppendOnlyReaderWithProjection(self):
+ def test_avro_append_only_reader_with_projection(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'avro'})
self.rest_catalog.create_table('default.test_avro_append_only_projection',
schema, False)
table =
self.rest_catalog.get_table('default.test_avro_append_only_projection')
@@ -515,7 +516,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
expected = self.expected.select(['dt', 'user_id'])
self.assertEqual(actual, expected)
- def testAppendOnlyReaderWithLimit(self):
+ def test_append_only_reader_with_limit(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_limit',
schema, False)
table = self.rest_catalog.get_table('default.test_append_only_limit')
@@ -527,7 +528,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
# might be split of "dt=1" or split of "dt=2"
self.assertEqual(actual.num_rows, 4)
- def testWriteWrongSchema(self):
+ def test_write_wrong_schema(self):
self.rest_catalog.create_table('default.test_wrong_schema',
Schema.from_pyarrow_schema(self.pa_schema),
False)
@@ -551,7 +552,7 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
table_write.write_arrow_batch(record_batch)
self.assertTrue(str(e.exception).startswith("Input schema isn't
consistent with table schema."))
- def testWriteWideTableLargeData(self):
+ def test_write_wide_table_large_data(self):
logging.basicConfig(level=logging.INFO)
catalog = CatalogFactory.create(self.options)
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index 795365e48e..db0cbcccd1 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -51,7 +51,7 @@ class AoReaderTest(unittest.TestCase):
'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p2'],
}, schema=cls.pa_schema)
- def testParquetAppendOnlyReader(self):
+ def test_parquet_ao_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.catalog.create_table('default.test_append_only_parquet', schema,
False)
table = self.catalog.get_table('default.test_append_only_parquet')
@@ -61,7 +61,7 @@ class AoReaderTest(unittest.TestCase):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
- def testOrcAppendOnlyReader(self):
+ def test_orc_ao_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'orc'})
self.catalog.create_table('default.test_append_only_orc', schema,
False)
table = self.catalog.get_table('default.test_append_only_orc')
@@ -71,7 +71,7 @@ class AoReaderTest(unittest.TestCase):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
- def testAvroAppendOnlyReader(self):
+ def test_avro_ao_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'avro'})
self.catalog.create_table('default.test_append_only_avro', schema,
False)
table = self.catalog.get_table('default.test_append_only_avro')
@@ -115,7 +115,7 @@ class AoReaderTest(unittest.TestCase):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
- def test_over1000cols_read(self):
+ def test_over_1000_cols_read(self):
num_rows = 1
num_cols = 10
table_name = "default.testBug"
@@ -189,7 +189,7 @@ class AoReaderTest(unittest.TestCase):
result = table_read.to_pandas(table_scan.plan().splits())
self.assertEqual(result.to_dict(), test_df.to_dict())
- def testAppendOnlyReaderWithFilter(self):
+ def test_ao_reader_with_filter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.catalog.create_table('default.test_append_only_filter', schema,
False)
table = self.catalog.get_table('default.test_append_only_filter')
@@ -243,7 +243,7 @@ class AoReaderTest(unittest.TestCase):
])
self.assertEqual(actual.sort_by('user_id'), expected)
- def testAppendOnlyReaderWithProjection(self):
+ def test_ao_reader_with_projection(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.catalog.create_table('default.test_append_only_projection',
schema, False)
table = self.catalog.get_table('default.test_append_only_projection')
@@ -254,7 +254,7 @@ class AoReaderTest(unittest.TestCase):
expected = self.expected.select(['dt', 'user_id'])
self.assertEqual(actual, expected)
- def testAvroAppendOnlyReaderWithProjection(self):
+ def test_avro_ao_reader_with_projection(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'avro'})
self.catalog.create_table('default.test_avro_append_only_projection',
schema, False)
table =
self.catalog.get_table('default.test_avro_append_only_projection')
@@ -265,7 +265,7 @@ class AoReaderTest(unittest.TestCase):
expected = self.expected.select(['dt', 'user_id'])
self.assertEqual(actual, expected)
- def testAppendOnlyReaderWithLimit(self):
+ def test_ao_reader_with_limit(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.catalog.create_table('default.test_append_only_limit', schema,
False)
table = self.catalog.get_table('default.test_append_only_limit')
diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
similarity index 79%
rename from paimon-python/pypaimon/tests/reader_basic_test.py
rename to paimon-python/pypaimon/tests/reader_base_test.py
index e66f7cb94a..ccf06d5597 100644
--- a/paimon-python/pypaimon/tests/reader_basic_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -17,6 +17,7 @@
################################################################################
import os
+import glob
import shutil
import tempfile
import unittest
@@ -29,8 +30,9 @@ import pyarrow as pa
from pypaimon.table.row.generic_row import GenericRow
-from pypaimon.schema.data_types import DataField, AtomicType
-
+from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
+ MapType, PyarrowFieldParser)
+from pypaimon.schema.table_schema import TableSchema
from pypaimon import CatalogFactory
from pypaimon import Schema
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
@@ -224,7 +226,52 @@ class ReaderBasicTest(unittest.TestCase):
self.assertEqual(min_value_stats, expected_min_values)
self.assertEqual(max_value_stats, expected_max_values)
- def test_mixed_add_and_delete_entries_same_partition(self):
+ def test_write_wrong_schema(self):
+ self.catalog.create_table('default.test_wrong_schema',
+ Schema.from_pyarrow_schema(self.pa_schema),
+ False)
+ table = self.catalog.get_table('default.test_wrong_schema')
+
+ data = {
+ 'f0': [1, 2, 3],
+ 'f1': ['a', 'b', 'c'],
+ }
+ df = pd.DataFrame(data)
+ schema = pa.schema([
+ ('f0', pa.int64()),
+ ('f1', pa.string())
+ ])
+ record_batch = pa.RecordBatch.from_pandas(df, schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+
+ with self.assertRaises(ValueError) as e:
+ table_write.write_arrow_batch(record_batch)
+ self.assertTrue(str(e.exception).startswith("Input schema isn't
consistent with table schema."))
+
+ def test_reader_iterator(self):
+ read_builder = self.table.new_read_builder()
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ iterator = table_read.to_iterator(splits)
+ result = []
+ value = next(iterator, None)
+ while value is not None:
+ result.append(value.get_field(1))
+ value = next(iterator, None)
+ self.assertEqual(result, [1001, 1002, 1003, 1004, 1005])
+
+ def test_reader_duckDB(self):
+ read_builder = self.table.new_read_builder()
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
+ actual = duckdb_con.query("SELECT * FROM duckdb_table").fetchdf()
+ expect = pd.DataFrame(self.raw_data)
+ pd.testing.assert_frame_equal(actual.reset_index(drop=True),
expect.reset_index(drop=True))
+
+ def test_mixed_add_and_delete_entries_compute_stats(self):
"""Test record_count calculation with mixed ADD/DELETE entries in same
partition."""
pa_schema = pa.schema([
('region', pa.string()),
@@ -279,7 +326,7 @@ class ReaderBasicTest(unittest.TestCase):
self.assertEqual(stat.file_count, 0)
self.assertEqual(stat.file_size_in_bytes, 1248)
- def test_multiple_partitions_with_different_operations(self):
+ def test_delete_entries_compute_stats(self):
"""Test record_count calculation across multiple partitions."""
pa_schema = pa.schema([
('region', pa.string()),
@@ -343,52 +390,7 @@ class ReaderBasicTest(unittest.TestCase):
self.assertEqual(south_stat.file_count, -1)
self.assertEqual(south_stat.file_size_in_bytes, -750)
- def testWriteWrongSchema(self):
- self.catalog.create_table('default.test_wrong_schema',
- Schema.from_pyarrow_schema(self.pa_schema),
- False)
- table = self.catalog.get_table('default.test_wrong_schema')
-
- data = {
- 'f0': [1, 2, 3],
- 'f1': ['a', 'b', 'c'],
- }
- df = pd.DataFrame(data)
- schema = pa.schema([
- ('f0', pa.int64()),
- ('f1', pa.string())
- ])
- record_batch = pa.RecordBatch.from_pandas(df, schema)
-
- write_builder = table.new_batch_write_builder()
- table_write = write_builder.new_write()
-
- with self.assertRaises(ValueError) as e:
- table_write.write_arrow_batch(record_batch)
- self.assertTrue(str(e.exception).startswith("Input schema isn't
consistent with table schema."))
-
- def testReaderIterator(self):
- read_builder = self.table.new_read_builder()
- table_read = read_builder.new_read()
- splits = read_builder.new_scan().plan().splits()
- iterator = table_read.to_iterator(splits)
- result = []
- value = next(iterator, None)
- while value is not None:
- result.append(value.get_field(1))
- value = next(iterator, None)
- self.assertEqual(result, [1001, 1002, 1003, 1004, 1005])
-
- def testReaderDuckDB(self):
- read_builder = self.table.new_read_builder()
- table_read = read_builder.new_read()
- splits = read_builder.new_scan().plan().splits()
- duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
- actual = duckdb_con.query("SELECT * FROM duckdb_table").fetchdf()
- expect = pd.DataFrame(self.raw_data)
- pd.testing.assert_frame_equal(actual.reset_index(drop=True),
expect.reset_index(drop=True))
-
- def test_value_stats_cols_logic(self):
+ def test_value_stats_cols_param(self):
"""Test _VALUE_STATS_COLS logic in ManifestFileManager."""
# Create a catalog and table
catalog = CatalogFactory.create({
@@ -437,6 +439,112 @@ class ReaderBasicTest(unittest.TestCase):
test_name="specific_case"
)
+ def test_types(self):
+ data_fields = [
+ DataField(0, "f0", AtomicType('TINYINT'), 'desc'),
+ DataField(1, "f1", AtomicType('SMALLINT'), 'desc'),
+ DataField(2, "f2", AtomicType('INT'), 'desc'),
+ DataField(3, "f3", AtomicType('BIGINT'), 'desc'),
+ DataField(4, "f4", AtomicType('FLOAT'), 'desc'),
+ DataField(5, "f5", AtomicType('DOUBLE'), 'desc'),
+ DataField(6, "f6", AtomicType('BOOLEAN'), 'desc'),
+ DataField(7, "f7", AtomicType('STRING'), 'desc'),
+ DataField(8, "f8", AtomicType('BINARY(12)'), 'desc'),
+ DataField(9, "f9", AtomicType('DECIMAL(10, 6)'), 'desc'),
+ DataField(10, "f10", AtomicType('BYTES'), 'desc'),
+ DataField(11, "f11", AtomicType('DATE'), 'desc'),
+ DataField(12, "f12", AtomicType('TIME(0)'), 'desc'),
+ DataField(13, "f13", AtomicType('TIME(3)'), 'desc'),
+ DataField(14, "f14", AtomicType('TIME(6)'), 'desc'),
+ DataField(15, "f15", AtomicType('TIME(9)'), 'desc'),
+ DataField(16, "f16", AtomicType('TIMESTAMP(0)'), 'desc'),
+ DataField(17, "f17", AtomicType('TIMESTAMP(3)'), 'desc'),
+ DataField(18, "f18", AtomicType('TIMESTAMP(6)'), 'desc'),
+ DataField(19, "f19", AtomicType('TIMESTAMP(9)'), 'desc'),
+ DataField(20, "arr", ArrayType(True, AtomicType('INT')), 'desc
arr1'),
+ DataField(21, "map1",
+ MapType(False, AtomicType('INT', False),
+ MapType(False, AtomicType('INT', False),
AtomicType('INT', False))),
+ 'desc map1'),
+ ]
+ table_schema = TableSchema(TableSchema.CURRENT_VERSION,
len(data_fields), data_fields,
+ max(field.id for field in data_fields),
+ [], [], {}, "")
+ pa_fields = []
+ for field in table_schema.fields:
+ pa_field = PyarrowFieldParser.from_paimon_field(field)
+ pa_fields.append(pa_field)
+ schema = Schema.from_pyarrow_schema(
+ pa_schema=pa.schema(pa_fields),
+ partition_keys=table_schema.partition_keys,
+ primary_keys=table_schema.primary_keys,
+ options=table_schema.options,
+ comment=table_schema.comment
+ )
+ table_schema2 = TableSchema.from_schema(len(data_fields), schema)
+ l1 = []
+ for field in table_schema.fields:
+ l1.append(field.to_dict())
+ l2 = []
+ for field in table_schema2.fields:
+ l2.append(field.to_dict())
+ self.assertEqual(l1, l2)
+
+ def test_write(self):
+ pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string())
+ ])
+ catalog = CatalogFactory.create({
+ "warehouse": self.warehouse
+ })
+ catalog.create_database("test_write_db", False)
+ catalog.create_table("test_write_db.test_table",
Schema.from_pyarrow_schema(pa_schema), False)
+ table = catalog.get_table("test_write_db.test_table")
+
+ data = {
+ 'f0': [1, 2, 3],
+ 'f1': ['a', 'b', 'c'],
+ 'f2': ['X', 'Y', 'Z']
+ }
+ expect = pa.Table.from_pydict(data, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(expect)
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ self.assertTrue(os.path.exists(self.warehouse +
"/test_write_db.db/test_table/snapshot/LATEST"))
+ self.assertTrue(os.path.exists(self.warehouse +
"/test_write_db.db/test_table/snapshot/snapshot-1"))
+ self.assertTrue(os.path.exists(self.warehouse +
"/test_write_db.db/test_table/manifest"))
+ self.assertTrue(os.path.exists(self.warehouse +
"/test_write_db.db/test_table/bucket-0"))
+ self.assertEqual(len(glob.glob(self.warehouse +
"/test_write_db.db/test_table/manifest/*")), 3)
+ self.assertEqual(len(glob.glob(self.warehouse +
"/test_write_db.db/test_table/bucket-0/*.parquet")), 1)
+
+ with open(self.warehouse +
'/test_write_db.db/test_table/snapshot/snapshot-1', 'r', encoding='utf-8') as
file:
+ content = ''.join(file.readlines())
+ self.assertTrue(content.__contains__('\"totalRecordCount\": 3'))
+ self.assertTrue(content.__contains__('\"deltaRecordCount\": 3'))
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(expect)
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ with open(self.warehouse +
'/test_write_db.db/test_table/snapshot/snapshot-2', 'r', encoding='utf-8') as
file:
+ content = ''.join(file.readlines())
+ self.assertTrue(content.__contains__('\"totalRecordCount\": 6'))
+ self.assertTrue(content.__contains__('\"deltaRecordCount\": 3'))
+
def _test_value_stats_cols_case(self, manifest_manager, table,
value_stats_cols, expected_fields_count, test_name):
"""Helper method to test a specific _VALUE_STATS_COLS case."""
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index d1a565d4cc..b992595fc9 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -54,7 +54,7 @@ class PkReaderTest(unittest.TestCase):
def tearDownClass(cls):
shutil.rmtree(cls.tempdir, ignore_errors=True)
- def testPkParquetReader(self):
+ def test_pk_parquet_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
primary_keys=['user_id', 'dt'],
@@ -67,7 +67,7 @@ class PkReaderTest(unittest.TestCase):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
- def testPkOrcReader(self):
+ def test_pk_orc_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
primary_keys=['user_id', 'dt'],
@@ -88,7 +88,7 @@ class PkReaderTest(unittest.TestCase):
col_b = self.expected.column(i)
self.assertEqual(col_a, col_b)
- def testPkAvroReader(self):
+ def test_pk_avro_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
primary_keys=['user_id', 'dt'],
@@ -148,7 +148,7 @@ class PkReaderTest(unittest.TestCase):
}, schema=self.pa_schema)
self.assertEqual(actual, expected)
- def testPkReaderWithFilter(self):
+ def test_pk_reader_with_filter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
primary_keys=['user_id', 'dt'],
@@ -166,11 +166,11 @@ class PkReaderTest(unittest.TestCase):
actual = self._read_test_table(read_builder).sort_by('user_id')
expected = pa.concat_tables([
self.expected.slice(1, 1), # 2/b
- self.expected.slice(5, 1) # 7/g
+ self.expected.slice(5, 1) # 7/g
])
self.assertEqual(actual, expected)
- def testPkReaderWithProjection(self):
+ def test_pk_reader_with_projection(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
primary_keys=['user_id', 'dt'],
diff --git a/paimon-python/pypaimon/tests/rest/__init__.py
b/paimon-python/pypaimon/tests/rest/__init__.py
new file mode 100644
index 0000000000..53ed4d36c2
--- /dev/null
+++ b/paimon-python/pypaimon/tests/rest/__init__.py
@@ -0,0 +1,17 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
diff --git a/paimon-python/pypaimon/tests/api_test.py
b/paimon-python/pypaimon/tests/rest/api_test.py
similarity index 98%
rename from paimon-python/pypaimon/tests/api_test.py
rename to paimon-python/pypaimon/tests/rest/api_test.py
index c63b912635..374765cea7 100644
--- a/paimon-python/pypaimon/tests/api_test.py
+++ b/paimon-python/pypaimon/tests/rest/api_test.py
@@ -31,10 +31,10 @@ from pypaimon.schema.data_types import (ArrayType,
AtomicInteger, AtomicType,
DataField, DataTypeParser, MapType,
RowType)
from pypaimon.schema.table_schema import TableSchema
-from pypaimon.tests.rest_server import RESTCatalogServer
+from pypaimon.tests.rest.rest_server import RESTCatalogServer
-class ApiTestCase(unittest.TestCase):
+class ApiTest(unittest.TestCase):
def test_parse_data(self):
simple_type_test_cases = [
diff --git a/paimon-python/pypaimon/tests/rest_catalog_base_test.py
b/paimon-python/pypaimon/tests/rest/rest_base_test.py
similarity index 98%
rename from paimon-python/pypaimon/tests/rest_catalog_base_test.py
rename to paimon-python/pypaimon/tests/rest/rest_base_test.py
index 0dc3a129b7..3a83ccb285 100644
--- a/paimon-python/pypaimon/tests/rest_catalog_base_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_base_test.py
@@ -37,10 +37,10 @@ from pypaimon.schema.data_types import (ArrayType,
AtomicType, DataField,
MapType)
from pypaimon import Schema
from pypaimon.schema.table_schema import TableSchema
-from pypaimon.tests.rest_server import RESTCatalogServer
+from pypaimon.tests.rest.rest_server import RESTCatalogServer
-class RESTCatalogBaseTest(unittest.TestCase):
+class RESTBaseTest(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp(prefix="unittest_")
self.warehouse = os.path.join(self.temp_dir, 'warehouse')
diff --git a/paimon-python/pypaimon/tests/test_rest_catalog_commit_snapshot.py
b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
similarity index 100%
rename from paimon-python/pypaimon/tests/test_rest_catalog_commit_snapshot.py
rename to paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
diff --git a/paimon-python/pypaimon/tests/rest_table_read_write_test.py
b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
similarity index 96%
rename from paimon-python/pypaimon/tests/rest_table_read_write_test.py
rename to paimon-python/pypaimon/tests/rest/rest_read_write_test.py
index b070d39bbf..dc6c47e778 100644
--- a/paimon-python/pypaimon/tests/rest_table_read_write_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
@@ -27,10 +27,10 @@ from pypaimon import CatalogFactory
from pypaimon.catalog.rest.rest_catalog import RESTCatalog
from pypaimon.common.identifier import Identifier
from pypaimon import Schema
-from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest
+from pypaimon.tests.rest.rest_base_test import RESTBaseTest
-class RESTTableReadWriteTest(RESTCatalogBaseTest):
+class RESTTableReadWriteTest(RESTBaseTest):
def test_overwrite(self):
simple_pa_schema = pa.schema([
@@ -113,7 +113,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
- def testParquetAppendOnlyReader(self):
+ def test_parquet_ao_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_parquet',
schema, False)
table = self.rest_catalog.get_table('default.test_append_only_parquet')
@@ -123,7 +123,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
- def testOrcAppendOnlyReader(self):
+ def test_orc_ao_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'orc'})
self.rest_catalog.create_table('default.test_append_only_orc', schema,
False)
table = self.rest_catalog.get_table('default.test_append_only_orc')
@@ -133,7 +133,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
- def testAvroAppendOnlyReader(self):
+ def test_avro_ao_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'avro'})
self.rest_catalog.create_table('default.test_append_only_avro',
schema, False)
table = self.rest_catalog.get_table('default.test_append_only_avro')
@@ -143,7 +143,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
- def testAppendOnlyReaderWithFilter(self):
+ def test_ao_reader_with_filter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_filter',
schema, False)
table = self.rest_catalog.get_table('default.test_append_only_filter')
@@ -197,7 +197,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
])
self.assertEqual(actual.sort_by('user_id'), expected)
- def testAppendOnlyReaderWithProjection(self):
+ def test_ao_reader_with_projection(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_projection',
schema, False)
table =
self.rest_catalog.get_table('default.test_append_only_projection')
@@ -208,7 +208,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
expected = self.expected.select(['dt', 'user_id'])
self.assertEqual(actual, expected)
- def testAvroAppendOnlyReaderWithProjection(self):
+ def test_avro_ao_reader_with_projection(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'avro'})
self.rest_catalog.create_table('default.test_avro_append_only_projection',
schema, False)
table =
self.rest_catalog.get_table('default.test_avro_append_only_projection')
@@ -219,7 +219,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
expected = self.expected.select(['dt', 'user_id'])
self.assertEqual(actual, expected)
- def testAppendOnlyReaderWithLimit(self):
+ def test_ao_reader_with_limit(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_limit',
schema, False)
table = self.rest_catalog.get_table('default.test_append_only_limit')
@@ -231,7 +231,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
# might be split of "dt=1" or split of "dt=2"
self.assertEqual(actual.num_rows, 4)
- def testPkParquetReader(self):
+ def test_pk_parquet_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
primary_keys=['user_id', 'dt'],
@@ -244,7 +244,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
- def testPkOrcReader(self):
+ def test_pk_orc_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
primary_keys=['user_id', 'dt'],
@@ -265,7 +265,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
col_b = self.expected.column(i)
self.assertEqual(col_a, col_b)
- def testPkAvroReader(self):
+ def test_pk_avro_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
primary_keys=['user_id', 'dt'],
@@ -281,7 +281,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
- def testPkReaderWithFilter(self):
+ def test_pk_reader_with_filter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
primary_keys=['user_id', 'dt'],
@@ -303,7 +303,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
])
self.assertEqual(actual, expected)
- def testPkReaderWithProjection(self):
+ def test_pk_reader_with_projection(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
primary_keys=['user_id', 'dt'],
@@ -317,7 +317,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
expected = self.expected.select(['dt', 'user_id', 'behavior'])
self.assertEqual(actual, expected)
- def testWriteWrongSchema(self):
+ def test_write_wrong_schema(self):
self.rest_catalog.create_table('default.test_wrong_schema',
Schema.from_pyarrow_schema(self.pa_schema),
False)
@@ -341,7 +341,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
table_write.write_arrow_batch(record_batch)
self.assertTrue(str(e.exception).startswith("Input schema isn't
consistent with table schema."))
- def testReaderIterator(self):
+ def test_reader_iterator(self):
read_builder = self.table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
@@ -353,7 +353,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
value = next(iterator, None)
self.assertEqual(result, [1001, 1002, 1003, 1004, 1005, 1006, 1007,
1008])
- def testReaderDuckDB(self):
+ def test_reader_duckdb(self):
read_builder = self.table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
@@ -362,7 +362,7 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
expect = pd.DataFrame(self.raw_data)
pd.testing.assert_frame_equal(actual.reset_index(drop=True),
expect.reset_index(drop=True))
- def testWriteWideTableLargeData(self):
+ def test_write_wide_table_large_data(self):
logging.basicConfig(level=logging.INFO)
catalog = CatalogFactory.create(self.options)
diff --git a/paimon-python/pypaimon/tests/rest_server.py
b/paimon-python/pypaimon/tests/rest/rest_server.py
similarity index 100%
rename from paimon-python/pypaimon/tests/rest_server.py
rename to paimon-python/pypaimon/tests/rest/rest_server.py
diff --git a/paimon-python/pypaimon/tests/rest_table_test.py
b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
similarity index 98%
rename from paimon-python/pypaimon/tests/rest_table_test.py
rename to paimon-python/pypaimon/tests/rest/rest_simple_test.py
index d86046c80c..95a20345b0 100644
--- a/paimon-python/pypaimon/tests/rest_table_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
@@ -21,13 +21,13 @@ import os
import pyarrow as pa
from pypaimon import Schema
-from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest
+from pypaimon.tests.rest.rest_base_test import RESTBaseTest
from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor,
FixedBucketRowKeyExtractor,
UnawareBucketRowKeyExtractor)
-class RESTTableTest(RESTCatalogBaseTest):
+class RESTSimpleTest(RESTBaseTest):
def setUp(self):
super().setUp()
self.pa_schema = pa.schema([
diff --git a/paimon-python/pypaimon/tests/schema_test.py
b/paimon-python/pypaimon/tests/schema_test.py
deleted file mode 100644
index 671f837117..0000000000
--- a/paimon-python/pypaimon/tests/schema_test.py
+++ /dev/null
@@ -1,79 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-
-import unittest
-
-import pyarrow
-
-from pypaimon import Schema
-from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
- MapType, PyarrowFieldParser)
-from pypaimon.schema.table_schema import TableSchema
-
-
-class SchemaTestCase(unittest.TestCase):
- def test_types(self):
- data_fields = [
- DataField(0, "f0", AtomicType('TINYINT'), 'desc'),
- DataField(1, "f1", AtomicType('SMALLINT'), 'desc'),
- DataField(2, "f2", AtomicType('INT'), 'desc'),
- DataField(3, "f3", AtomicType('BIGINT'), 'desc'),
- DataField(4, "f4", AtomicType('FLOAT'), 'desc'),
- DataField(5, "f5", AtomicType('DOUBLE'), 'desc'),
- DataField(6, "f6", AtomicType('BOOLEAN'), 'desc'),
- DataField(7, "f7", AtomicType('STRING'), 'desc'),
- DataField(8, "f8", AtomicType('BINARY(12)'), 'desc'),
- DataField(9, "f9", AtomicType('DECIMAL(10, 6)'), 'desc'),
- DataField(10, "f10", AtomicType('BYTES'), 'desc'),
- DataField(11, "f11", AtomicType('DATE'), 'desc'),
- DataField(12, "f12", AtomicType('TIME(0)'), 'desc'),
- DataField(13, "f13", AtomicType('TIME(3)'), 'desc'),
- DataField(14, "f14", AtomicType('TIME(6)'), 'desc'),
- DataField(15, "f15", AtomicType('TIME(9)'), 'desc'),
- DataField(16, "f16", AtomicType('TIMESTAMP(0)'), 'desc'),
- DataField(17, "f17", AtomicType('TIMESTAMP(3)'), 'desc'),
- DataField(18, "f18", AtomicType('TIMESTAMP(6)'), 'desc'),
- DataField(19, "f19", AtomicType('TIMESTAMP(9)'), 'desc'),
- DataField(20, "arr", ArrayType(True, AtomicType('INT')), 'desc
arr1'),
- DataField(21, "map1",
- MapType(False, AtomicType('INT', False),
- MapType(False, AtomicType('INT', False),
AtomicType('INT', False))),
- 'desc map1'),
- ]
- table_schema = TableSchema(TableSchema.CURRENT_VERSION,
len(data_fields), data_fields,
- max(field.id for field in data_fields),
- [], [], {}, "")
- pa_fields = []
- for field in table_schema.fields:
- pa_field = PyarrowFieldParser.from_paimon_field(field)
- pa_fields.append(pa_field)
- schema = Schema.from_pyarrow_schema(
- pa_schema=pyarrow.schema(pa_fields),
- partition_keys=table_schema.partition_keys,
- primary_keys=table_schema.primary_keys,
- options=table_schema.options,
- comment=table_schema.comment
- )
- table_schema2 = TableSchema.from_schema(len(data_fields), schema)
- l1 = []
- for field in table_schema.fields:
- l1.append(field.to_dict())
- l2 = []
- for field in table_schema2.fields:
- l2.append(field.to_dict())
- self.assertEqual(l1, l2)
diff --git a/paimon-python/pypaimon/tests/writer_test.py
b/paimon-python/pypaimon/tests/writer_test.py
deleted file mode 100644
index 34f7c53e70..0000000000
--- a/paimon-python/pypaimon/tests/writer_test.py
+++ /dev/null
@@ -1,94 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import glob
-import os
-import shutil
-import tempfile
-import unittest
-
-import pyarrow
-
-from pypaimon import CatalogFactory
-from pypaimon import Schema
-
-
-class WriterTest(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
- cls.temp_dir = tempfile.mkdtemp(prefix="unittest_")
- cls.warehouse = os.path.join(cls.temp_dir, 'test_dir')
-
- @classmethod
- def tearDownClass(cls):
- shutil.rmtree(cls.temp_dir, ignore_errors=True)
-
- def test_writer(self):
- pa_schema = pyarrow.schema([
- ('f0', pyarrow.int32()),
- ('f1', pyarrow.string()),
- ('f2', pyarrow.string())
- ])
- catalog = CatalogFactory.create({
- "warehouse": self.warehouse
- })
- catalog.create_database("test_db", False)
- catalog.create_table("test_db.test_table",
Schema.from_pyarrow_schema(pa_schema), False)
- table = catalog.get_table("test_db.test_table")
-
- data = {
- 'f0': [1, 2, 3],
- 'f1': ['a', 'b', 'c'],
- 'f2': ['X', 'Y', 'Z']
- }
- expect = pyarrow.Table.from_pydict(data, schema=pa_schema)
-
- write_builder = table.new_batch_write_builder()
- table_write = write_builder.new_write()
- table_commit = write_builder.new_commit()
- table_write.write_arrow(expect)
- commit_messages = table_write.prepare_commit()
- table_commit.commit(commit_messages)
- table_write.close()
- table_commit.close()
-
- self.assertTrue(os.path.exists(self.warehouse +
"/test_db.db/test_table/snapshot/LATEST"))
- self.assertTrue(os.path.exists(self.warehouse +
"/test_db.db/test_table/snapshot/snapshot-1"))
- self.assertTrue(os.path.exists(self.warehouse +
"/test_db.db/test_table/manifest"))
- self.assertTrue(os.path.exists(self.warehouse +
"/test_db.db/test_table/bucket-0"))
- self.assertEqual(len(glob.glob(self.warehouse +
"/test_db.db/test_table/manifest/*")), 3)
- self.assertEqual(len(glob.glob(self.warehouse +
"/test_db.db/test_table/bucket-0/*.parquet")), 1)
-
- with open(self.warehouse +
'/test_db.db/test_table/snapshot/snapshot-1', 'r', encoding='utf-8') as file:
- content = ''.join(file.readlines())
- self.assertTrue(content.__contains__('\"totalRecordCount\": 3'))
- self.assertTrue(content.__contains__('\"deltaRecordCount\": 3'))
-
- write_builder = table.new_batch_write_builder()
- table_write = write_builder.new_write()
- table_commit = write_builder.new_commit()
- table_write.write_arrow(expect)
- commit_messages = table_write.prepare_commit()
- table_commit.commit(commit_messages)
- table_write.close()
- table_commit.close()
-
- with open(self.warehouse +
'/test_db.db/test_table/snapshot/snapshot-2', 'r', encoding='utf-8') as file:
- content = ''.join(file.readlines())
- self.assertTrue(content.__contains__('\"totalRecordCount\": 6'))
- self.assertTrue(content.__contains__('\"deltaRecordCount\": 3'))