This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4ea66af1d6 [python] Support schema manager for PyPaimon (#5974)
4ea66af1d6 is described below
commit 4ea66af1d6694fcd8fbffdc5a70d51c9074664d7
Author: ChengHui Chen <[email protected]>
AuthorDate: Tue Jul 29 20:54:40 2025 +0800
[python] Support schema manager for PyPaimon (#5974)
---
paimon-python/dev/lint-python.sh | 2 +-
paimon-python/pypaimon/api/api_response.py | 2 +-
paimon-python/pypaimon/api/data_types.py | 86 ++++++++++
paimon-python/pypaimon/api/table_schema.py | 83 ----------
paimon-python/pypaimon/catalog/table_metadata.py | 2 +-
paimon-python/pypaimon/common/file_io.py | 14 +-
paimon-python/pypaimon/rest/rest_catalog.py | 4 +-
paimon-python/pypaimon/{api => schema}/schema.py | 0
paimon-python/pypaimon/schema/schema_manager.py | 83 ++++++++++
paimon-python/pypaimon/schema/table_schema.py | 174 +++++++++++++++++++++
paimon-python/pypaimon/table/file_store_table.py | 2 +-
.../pypaimon/table/file_store_table_factory.py | 2 +-
paimon-python/pypaimon/tests/api_test.py | 8 +-
paimon-python/pypaimon/tests/pvfs_test.py | 5 +-
paimon-python/pypaimon/tests/rest_server.py | 3 +-
15 files changed, 373 insertions(+), 97 deletions(-)
diff --git a/paimon-python/dev/lint-python.sh b/paimon-python/dev/lint-python.sh
index 9c53310399..ae214d35f6 100755
--- a/paimon-python/dev/lint-python.sh
+++ b/paimon-python/dev/lint-python.sh
@@ -133,7 +133,7 @@ function check_stage() {
#########################
# Flake8 check
function flake8_check() {
- local PYTHON_SOURCE="$(find . \( -path ./dev -o -path ./.tox \) -prune -o
-type f -name "*.py" -print )"
+ local PYTHON_SOURCE="$(find . \( -path ./dev -o -path ./.tox -o -path
./.venv \) -prune -o -type f -name "*.py" -print )"
print_function "STAGE" "flake8 checks"
if [ ! -f "$FLAKE8_PATH" ]; then
diff --git a/paimon-python/pypaimon/api/api_response.py
b/paimon-python/pypaimon/api/api_response.py
index b8d559d611..b3ec148e94 100644
--- a/paimon-python/pypaimon/api/api_response.py
+++ b/paimon-python/pypaimon/api/api_response.py
@@ -21,7 +21,7 @@ from typing import Dict, Optional, Generic, List
from dataclasses import dataclass
from .rest_json import json_field
-from .schema import Schema
+from pypaimon.schema.schema import Schema
from .typedef import T
diff --git a/paimon-python/pypaimon/api/data_types.py
b/paimon-python/pypaimon/api/data_types.py
index 6e6ea97456..c6cf516ad5 100644
--- a/paimon-python/pypaimon/api/data_types.py
+++ b/paimon-python/pypaimon/api/data_types.py
@@ -16,12 +16,15 @@
# under the License.
import json
+import re
import threading
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Dict, Any, Optional, List, Union
+import pyarrow
+
class AtomicInteger:
@@ -192,6 +195,43 @@ class DataField:
return result
+ def to_pyarrow_field(self):
+ data_type = self.type
+ if not isinstance(data_type, AtomicType):
+ raise ValueError(f"Unsupported data type: {data_type.__class__}")
+ type_name = data_type.type.upper()
+ if type_name == 'INT':
+ type_name = pyarrow.int32()
+ elif type_name == 'BIGINT':
+ type_name = pyarrow.int64()
+ elif type_name == 'FLOAT':
+ type_name = pyarrow.float32()
+ elif type_name == 'DOUBLE':
+ type_name = pyarrow.float64()
+ elif type_name == 'BOOLEAN':
+ type_name = pyarrow.bool_()
+ elif type_name == 'STRING':
+ type_name = pyarrow.string()
+ elif type_name == 'BINARY':
+ type_name = pyarrow.binary()
+ elif type_name == 'DATE':
+ type_name = pyarrow.date32()
+ elif type_name == 'TIMESTAMP':
+ type_name = pyarrow.timestamp('ms')
+ elif type_name.startswith('DECIMAL'):
+ match = re.match(r'DECIMAL\((\d+),\s*(\d+)\)', type_name)
+ if match:
+ precision, scale = map(int, match.groups())
+ type_name = pyarrow.decimal128(precision, scale)
+ else:
+ type_name = pyarrow.decimal128(38, 18)
+ else:
+ raise ValueError(f"Unsupported data type: {type_name}")
+ metadata = {}
+ if self.description:
+ metadata[b'description'] = self.description.encode('utf-8')
+ return pyarrow.field(self.name, type_name,
nullable=data_type.nullable, metadata=metadata)
+
@dataclass
class RowType(DataType):
@@ -266,6 +306,36 @@ class DataTypeParser:
except ValueError:
raise Exception(f"Unknown type: {base_type}")
+ @staticmethod
+ def parse_atomic_type_pyarrow_field(field: pyarrow.Field) -> DataType:
+ type_name = str(field.type)
+ if type_name.startswith('int') or type_name.startswith('uint'):
+ type_name = 'INT'
+ elif type_name.startswith('float'):
+ type_name = 'FLOAT'
+ elif type_name.startswith('double'):
+ type_name = 'DOUBLE'
+ elif type_name.startswith('bool'):
+ type_name = 'BOOLEAN'
+ elif type_name.startswith('string'):
+ type_name = 'STRING'
+ elif type_name.startswith('binary'):
+ type_name = 'BINARY'
+ elif type_name.startswith('date'):
+ type_name = 'DATE'
+ elif type_name.startswith('timestamp'):
+ type_name = 'TIMESTAMP'
+ elif type_name.startswith('decimal'):
+ match = re.match(r'decimal\((\d+),\s*(\d+)\)', type_name)
+ if match:
+ precision, scale = map(int, match.groups())
+ type_name = f'DECIMAL({precision},{scale})'
+ else:
+ type_name = 'DECIMAL(38,18)'
+ else:
+ raise ValueError(f"Unknown type: {type_name}")
+ return AtomicType(type_name, field.nullable)
+
@staticmethod
def parse_data_type(
json_data: Union[Dict[str, Any], str], field_id:
Optional[AtomicInteger] = None
@@ -370,3 +440,19 @@ def parse_data_field_from_json(
) -> DataField:
json_data = json.loads(json_str)
return DataTypeParser.parse_data_field(json_data, field_id)
+
+
+def parse_data_fields_from_pyarrow_schema(pa_schema: pyarrow.Schema) ->
list[DataField]:
+ fields = []
+ for i, pa_field in enumerate(pa_schema):
+ pa_field: pyarrow.Field
+ data_type = DataTypeParser.parse_atomic_type_pyarrow_field(pa_field)
+ data_field = DataField(
+ id=i,
+ name=pa_field.name,
+ type=data_type,
+ description=pa_field.metadata.get(b'description', b'').decode
+ ('utf-8') if pa_field.metadata and b'description' in
pa_field.metadata else None
+ )
+ fields.append(data_field)
+ return fields
diff --git a/paimon-python/pypaimon/api/table_schema.py
b/paimon-python/pypaimon/api/table_schema.py
deleted file mode 100644
index ac0c2b7a11..0000000000
--- a/paimon-python/pypaimon/api/table_schema.py
+++ /dev/null
@@ -1,83 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-
-import time
-from typing import List, Dict, Optional
-
-from pypaimon.api.schema import Schema
-from pypaimon.api.data_types import DataField
-
-
-class TableSchema:
-
- def __init__(self, id: int, fields: List[DataField], highest_field_id: int,
- partition_keys: List[str], primary_keys: List[str], options:
Dict[str, str],
- comment: Optional[str] = None, time_millis: Optional[int] =
None):
- self.id = id
- self.fields = fields
- self.highest_field_id = highest_field_id
- self.partition_keys = partition_keys or []
- self.primary_keys = primary_keys or []
- self.options = options or {}
- self.comment = comment
- self.time_millis = time_millis if time_millis is not None else
int(time.time() * 1000)
-
- def to_schema(self) -> Schema:
- # pa_schema = schema_util.convert_data_fields_to_pa_schema(self.fields)
- return Schema(
- fields=self.fields,
- partition_keys=self.partition_keys,
- primary_keys=self.primary_keys,
- options=self.options,
- comment=self.comment
- )
-
- @staticmethod
- def create(schema_id: int, schema: Schema) -> "TableSchema":
- fields: List[DataField] = schema.fields
-
- partition_keys: List[str] = schema.partition_keys
-
- primary_keys: List[str] = schema.primary_keys
-
- options: Dict[str, str] = schema.options
-
- highest_field_id: int = None
-
- return TableSchema(
- schema_id,
- fields,
- highest_field_id,
- partition_keys,
- primary_keys,
- options,
- schema.comment,
- int(time.time())
- )
-
- def copy(self, new_options: Optional[Dict[str, str]] = None) ->
"TableSchema":
- return TableSchema(
- id=self.id,
- fields=self.fields,
- highest_field_id=self.highest_field_id,
- partition_keys=self.partition_keys,
- primary_keys=self.primary_keys,
- options=new_options,
- comment=self.comment,
- time_millis=self.time_millis
- )
diff --git a/paimon-python/pypaimon/catalog/table_metadata.py
b/paimon-python/pypaimon/catalog/table_metadata.py
index db79ac522a..02297efaa8 100644
--- a/paimon-python/pypaimon/catalog/table_metadata.py
+++ b/paimon-python/pypaimon/catalog/table_metadata.py
@@ -17,7 +17,7 @@ limitations under the License.
"""
from typing import Optional
-from pypaimon.api.table_schema import TableSchema
+from pypaimon.schema.table_schema import TableSchema
class TableMetadata:
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 438b73ce1d..9e7418966f 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -22,4 +22,16 @@ from pathlib import Path
class FileIO(ABC):
@abstractmethod
def exists(self, path: Path) -> bool:
- raise NotImplementedError("Method 'exists' must be implemented by
subclasses.")
+ """"""
+
+ @abstractmethod
+ def read_file_utf8(self, path: Path) -> str:
+ """"""
+
+ @abstractmethod
+ def try_to_write_atomic(self, path: Path, content: str) -> bool:
+ """"""
+
+ @abstractmethod
+ def list_status(self, path: Path):
+ """"""
diff --git a/paimon-python/pypaimon/rest/rest_catalog.py
b/paimon-python/pypaimon/rest/rest_catalog.py
index 364fec8247..c1ba37cd78 100644
--- a/paimon-python/pypaimon/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/rest/rest_catalog.py
@@ -24,7 +24,7 @@ from pypaimon.api.core_options import CoreOptions
from pypaimon.api.identifier import Identifier
from pypaimon.api.options import Options
-from pypaimon.api.table_schema import TableSchema
+from pypaimon.schema.table_schema import TableSchema
from pypaimon.catalog.catalog import Catalog
from pypaimon.catalog.catalog_context import CatalogContext
@@ -98,7 +98,7 @@ class RESTCatalog(Catalog):
return self.to_table_metadata(identifier.get_database_name(), response)
def to_table_metadata(self, db: str, response: GetTableResponse) ->
TableMetadata:
- schema = TableSchema.create(response.get_schema_id(),
response.get_schema())
+ schema = TableSchema.from_schema(response.get_schema_id(),
response.get_schema())
options: Dict[str, str] = dict(schema.options)
options[CoreOptions.PATH] = response.get_path()
response.put_audit_options_to(options)
diff --git a/paimon-python/pypaimon/api/schema.py
b/paimon-python/pypaimon/schema/schema.py
similarity index 100%
rename from paimon-python/pypaimon/api/schema.py
rename to paimon-python/pypaimon/schema/schema.py
diff --git a/paimon-python/pypaimon/schema/schema_manager.py
b/paimon-python/pypaimon/schema/schema_manager.py
index c87240062b..43e773c46e 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -16,8 +16,11 @@
# limitations under the License.
################################################################################
from pathlib import Path
+from typing import Optional
from pypaimon.common.file_io import FileIO
+from pypaimon.schema.schema import Schema
+from pypaimon.schema.table_schema import TableSchema
class SchemaManager:
@@ -27,3 +30,83 @@ class SchemaManager:
self.file_io = file_io
self.table_path = table_path
self.schema_path = table_path / "schema"
+
+ def latest(self) -> Optional['TableSchema']:
+ try:
+ versions = self._list_versioned_files()
+ if not versions:
+ return None
+
+ max_version = max(versions)
+ return self._read_schema(max_version)
+ except Exception as e:
+ raise RuntimeError(f"Failed to load schema from path:
{self.schema_path}") from e
+
+ def create_table(self, schema: Schema, external_table: bool = False) ->
TableSchema:
+ while True:
+ latest = self.latest()
+ if latest is not None:
+ if external_table:
+ self._check_schema_for_external_table(latest.to_schema(),
schema)
+ return latest
+ else:
+ raise RuntimeError("Schema in filesystem exists, creation
is not allowed.")
+
+ table_schema = TableSchema.from_schema(schema_id=0, schema=schema)
+ success = self.commit(table_schema)
+ if success:
+ return table_schema
+
+ def commit(self, new_schema: TableSchema) -> bool:
+ schema_path = self._to_schema_path(new_schema.id)
+ try:
+ return self.file_io.try_to_write_atomic(schema_path,
new_schema.to_json())
+ except Exception as e:
+ raise RuntimeError(f"Failed to commit schema: {e}") from e
+
+ def _to_schema_path(self, schema_id: int) -> Path:
+ return self.schema_path / f"{self.schema_prefix}{schema_id}"
+
+ def _read_schema(self, schema_id: int) -> Optional['TableSchema']:
+ schema_path = self._to_schema_path(schema_id)
+ if not self.file_io.exists(schema_path):
+ return None
+
+ return TableSchema.from_path(self.file_io, schema_path)
+
+ def _list_versioned_files(self) -> list[int]:
+ if not self.file_io.exists(self.schema_path):
+ return []
+
+ statuses = self.file_io.list_status(self.schema_path)
+ if statuses is None:
+ return []
+
+ versions = []
+ for status in statuses:
+ name = Path(status.path).name
+ if name.startswith(self.schema_prefix):
+ try:
+ version = int(name[len(self.schema_prefix):])
+ versions.append(version)
+ except ValueError:
+ continue
+ return versions
+
+ @staticmethod
+ def _check_schema_for_external_table(exists_schema: Schema, new_schema:
Schema):
+ if ((not new_schema.pa_schema or
new_schema.pa_schema.equals(exists_schema.pa_schema))
+ and (not new_schema.partition_keys or
new_schema.partition_keys == exists_schema.partition_keys)
+ and (not new_schema.primary_keys or new_schema.primary_keys ==
exists_schema.primary_keys)):
+ exists_options = exists_schema.options
+ new_options = new_schema.options
+ for key, value in new_options.items():
+ if (key != 'owner' and key != 'path'
+ and (key not in exists_options or exists_options[key]
!= value)):
+ raise ValueError(
+ f"New schema's options are not equal to the exists
schema's, "
+ f"new schema: {new_options}, exists schema:
{exists_options}")
+ else:
+ raise ValueError(
+ f"New schema is not equal to the exists schema, "
+ f"new schema: {new_schema}, exists schema: {exists_schema}")
diff --git a/paimon-python/pypaimon/schema/table_schema.py
b/paimon-python/pypaimon/schema/table_schema.py
new file mode 100644
index 0000000000..11ee24ae98
--- /dev/null
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -0,0 +1,174 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import json
+import time
+from pathlib import Path
+from typing import List, Dict, Optional
+
+import pyarrow
+
+from pypaimon.api import data_types
+from pypaimon.api.core_options import CoreOptions
+from pypaimon.common.file_io import FileIO
+from pypaimon.schema.schema import Schema
+from pypaimon.api.data_types import DataField
+
+
+class TableSchema:
+ PAIMON_07_VERSION = 1
+ PAIMON_08_VERSION = 2
+ CURRENT_VERSION = 3
+
+ def __init__(self, version: int, id: int, fields: List[DataField],
highest_field_id: int,
+ partition_keys: List[str], primary_keys: List[str], options:
Dict[str, str],
+ comment: Optional[str] = None, time_millis: Optional[int] =
None):
+ self.version = version
+ self.id = id
+ self.fields = fields
+ self.highest_field_id = highest_field_id
+ self.partition_keys = partition_keys or []
+ self.primary_keys = primary_keys or []
+ self.options = options or {}
+ self.comment = comment
+ self.time_millis = time_millis if time_millis is not None else
int(time.time() * 1000)
+
+ def to_schema(self) -> Schema:
+ try:
+ pa_fields = []
+ for field in self.fields:
+ pa_fields.append(field.to_pyarrow_field())
+ pyarrow.schema(pa_fields)
+ except Exception as e:
+ print(e)
+ return Schema(
+ fields=self.fields,
+ partition_keys=self.partition_keys,
+ primary_keys=self.primary_keys,
+ options=self.options,
+ comment=self.comment
+ )
+
+ @staticmethod
+ def from_path(file_io: FileIO, schema_path: Path):
+ try:
+ json_str = file_io.read_file_utf8(schema_path)
+ return TableSchema.from_json(json_str)
+ except FileNotFoundError as e:
+ raise RuntimeError(f"Schema file not found: {schema_path}") from e
+ except Exception as e:
+ raise RuntimeError(f"Failed to read schema from {schema_path}")
from e
+
+ @staticmethod
+ def from_json(json_str: str):
+ try:
+ data = json.loads(json_str)
+
+ version = data.get("version", TableSchema.PAIMON_07_VERSION)
+ options = data["options"]
+ if version <= TableSchema.PAIMON_07_VERSION and CoreOptions.BUCKET
not in options:
+ options[CoreOptions.BUCKET] = "1"
+ if version <= TableSchema.PAIMON_08_VERSION and
CoreOptions.FILE_FORMAT not in options:
+ options[CoreOptions.FILE_FORMAT] = "orc"
+ fields = [DataField.from_dict(field) for field in data["fields"]]
+
+ return TableSchema(
+ version=version,
+ id=data["id"],
+ fields=fields,
+ highest_field_id=data["highestFieldId"],
+ partition_keys=data["partitionKeys"],
+ primary_keys=data["primaryKeys"],
+ options=options,
+ comment=data.get("comment"),
+ time_millis=data.get("timeMillis")
+ )
+ except json.JSONDecodeError as e:
+ raise RuntimeError(f"Invalid JSON format: {json_str}") from e
+ except KeyError as e:
+ raise RuntimeError(f"Missing required field in schema JSON: {e}")
from e
+ except Exception as e:
+ raise RuntimeError(f"Failed to parse schema from JSON: {e}") from e
+
+ @staticmethod
+ def from_schema(schema_id: int, schema: Schema) -> "TableSchema":
+ fields: List[DataField] = schema.fields
+ if not schema.fields:
+ fields =
data_types.parse_data_fields_from_pyarrow_schema(schema.pa_schema)
+ partition_keys: List[str] = schema.partition_keys
+ primary_keys: List[str] = schema.primary_keys
+ options: Dict[str, str] = schema.options
+ highest_field_id: int = None # max(field.id for field in fields)
+
+ return TableSchema(
+ TableSchema.CURRENT_VERSION,
+ schema_id,
+ fields,
+ highest_field_id,
+ partition_keys,
+ primary_keys,
+ options,
+ schema.comment,
+ int(time.time())
+ )
+
+ def to_json(self) -> str:
+ data = {
+ "version": self.version,
+ "id": self.id,
+ "fields": [field.to_dict() for field in self.fields],
+ "highestFieldId": self.highest_field_id,
+ "partitionKeys": self.partition_keys,
+ "primaryKeys": self.primary_keys,
+ "options": self.options,
+ "timeMillis": self.time_millis
+ }
+ if self.comment is not None:
+ data["comment"] = self.comment
+ return json.dumps(data, indent=2, ensure_ascii=False)
+
+ def copy(self, new_options: Optional[Dict[str, str]] = None) ->
"TableSchema":
+ return TableSchema(
+ version=self.version,
+ id=self.id,
+ fields=self.fields,
+ highest_field_id=self.highest_field_id,
+ partition_keys=self.partition_keys,
+ primary_keys=self.primary_keys,
+ options=new_options,
+ comment=self.comment,
+ time_millis=self.time_millis
+ )
+
+ def get_primary_key_fields(self) -> List[DataField]:
+ if not self.primary_keys:
+ return []
+ field_map = {field.name: field for field in self.fields}
+ return [field_map[name] for name in self.primary_keys if name in
field_map]
+
+ def get_partition_key_fields(self) -> List[DataField]:
+ if not self.partition_keys:
+ return []
+ field_map = {field.name: field for field in self.fields}
+ return [field_map[name] for name in self.partition_keys if name in
field_map]
+
+ def get_trimmed_primary_key_fields(self) -> List[DataField]:
+ if not self.primary_keys or not self.partition_keys:
+ return self.get_primary_key_fields()
+ adjusted = [pk for pk in self.primary_keys if pk not in
self.partition_keys]
+ field_map = {field.name: field for field in self.fields}
+ return [field_map[name] for name in adjusted if name in field_map]
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index de5f8a870f..3e8f5bb166 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -19,7 +19,7 @@
from pathlib import Path
from pypaimon.api.identifier import Identifier
-from pypaimon.api.table_schema import TableSchema
+from pypaimon.schema.table_schema import TableSchema
from pypaimon.common.file_io import FileIO
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.table.table import Table
diff --git a/paimon-python/pypaimon/table/file_store_table_factory.py
b/paimon-python/pypaimon/table/file_store_table_factory.py
index 5569f60d8e..a8a2142a38 100644
--- a/paimon-python/pypaimon/table/file_store_table_factory.py
+++ b/paimon-python/pypaimon/table/file_store_table_factory.py
@@ -17,7 +17,7 @@ limitations under the License.
"""
from pathlib import Path
-from pypaimon.api.table_schema import TableSchema
+from pypaimon.schema.table_schema import TableSchema
from pypaimon.common.file_io import FileIO
from pypaimon.table.catalog_environment import CatalogEnvironment
from pypaimon.table.file_store_table import FileStoreTable
diff --git a/paimon-python/pypaimon/tests/api_test.py
b/paimon-python/pypaimon/tests/api_test.py
index d32c5c201c..8012a5a364 100644
--- a/paimon-python/pypaimon/tests/api_test.py
+++ b/paimon-python/pypaimon/tests/api_test.py
@@ -27,7 +27,7 @@ from ..api.auth import BearTokenAuthProvider
from ..api.identifier import Identifier
from ..api.options import Options
from ..api.rest_json import JSON
-from ..api.table_schema import TableSchema
+from pypaimon.schema.table_schema import TableSchema
from ..api.token_loader import DLFTokenLoaderFactory, DLFToken
from ..api.data_types import AtomicInteger, DataTypeParser, AtomicType,
ArrayType, MapType, RowType, DataField
@@ -157,7 +157,8 @@ class ApiTestCase(unittest.TestCase):
MapType(False,
AtomicType('INT'), AtomicType('INT'))),
'desc arr11'),
]
- schema = TableSchema(len(data_fields), data_fields,
len(data_fields), [], [], {}, "")
+ schema = TableSchema(TableSchema.CURRENT_VERSION,
len(data_fields), data_fields, len(data_fields),
+ [], [], {}, "")
test_tables = {
"default.user": TableMetadata(uuid=str(uuid.uuid4()),
is_external=True, schema=schema),
}
@@ -213,7 +214,8 @@ class ApiTestCase(unittest.TestCase):
MapType(False,
AtomicType('INT'), AtomicType('INT'))),
'desc arr11'),
]
- schema = TableSchema(len(data_fields), data_fields,
len(data_fields), [], [], {}, "")
+ schema = TableSchema(TableSchema.CURRENT_VERSION,
len(data_fields), data_fields, len(data_fields),
+ [], [], {}, "")
test_tables = {
"default.user": TableMetadata(uuid=str(uuid.uuid4()),
is_external=True, schema=schema),
}
diff --git a/paimon-python/pypaimon/tests/pvfs_test.py
b/paimon-python/pypaimon/tests/pvfs_test.py
index c78fcf94a0..c20f337302 100644
--- a/paimon-python/pypaimon/tests/pvfs_test.py
+++ b/paimon-python/pypaimon/tests/pvfs_test.py
@@ -25,7 +25,7 @@ from pathlib import Path
from pypaimon.api import ConfigResponse
from pypaimon.api.auth import BearTokenAuthProvider
from pypaimon.api.data_types import DataField, AtomicType
-from pypaimon.api.table_schema import TableSchema
+from pypaimon.schema.table_schema import TableSchema
from pypaimon.catalog.table_metadata import TableMetadata
from pypaimon.pvfs import PaimonVirtualFileSystem
from pypaimon.tests.api_test import RESTCatalogServer
@@ -70,7 +70,8 @@ class PVFSTestCase(unittest.TestCase):
DataField(0, "id", AtomicType('INT'), 'id'),
DataField(1, "name", AtomicType('STRING'), 'name')
]
- schema = TableSchema(len(data_fields), data_fields, len(data_fields),
[], [], {}, "")
+ schema = TableSchema(TableSchema.CURRENT_VERSION, len(data_fields),
data_fields, len(data_fields),
+ [], [], {}, "")
self.server.database_store.update(self.test_databases)
self.test_tables = {
f"{self.database}.{self.table}":
TableMetadata(uuid=str(uuid.uuid4()), is_external=True, schema=schema),
diff --git a/paimon-python/pypaimon/tests/rest_server.py
b/paimon-python/pypaimon/tests/rest_server.py
index 0bcf5d1dd3..27ba27907c 100644
--- a/paimon-python/pypaimon/tests/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest_server.py
@@ -33,7 +33,7 @@ from ..api.api_response import (ConfigResponse,
ListDatabasesResponse, GetDataba
Schema, GetTableResponse, ListTablesResponse,
RESTResponse, PagedList)
from ..api.rest_json import JSON
-from ..api.table_schema import TableSchema
+from pypaimon.schema.table_schema import TableSchema
from ..catalog.table_metadata import TableMetadata
@@ -642,6 +642,7 @@ class RESTCatalogServer:
"""Create table metadata"""
options = schema.options.copy()
table_schema = TableSchema(
+ version=TableSchema.CURRENT_VERSION,
id=schema_id,
fields=schema.fields,
highest_field_id=len(schema.fields) - 1,