This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0ca2e59928 [Python] Support write table for RESTCatalog (#6030)
0ca2e59928 is described below
commit 0ca2e5992862cf0c6516b24ab620e876a6e896ab
Author: HeavenZH <[email protected]>
AuthorDate: Thu Aug 7 10:31:31 2025 +0800
[Python] Support write table for RESTCatalog (#6030)
---
.github/workflows/e2e-tests-flink-1.x.yml | 1 +
.github/workflows/e2e-tests-flink-2.x-jdk11.yml | 1 +
.github/workflows/paimon-python-checks.yml | 1 +
.github/workflows/utitcase-flink-1.x.yml | 1 +
.github/workflows/utitcase-flink-2.x-jdk11.yml | 1 +
.github/workflows/utitcase-jdk11.yml | 1 +
.github/workflows/utitcase-spark-3.x.yml | 1 +
.github/workflows/utitcase-spark-4.x.yml | 1 +
.github/workflows/utitcase.yml | 1 +
paimon-python/pypaimon/api/__init__.py | 1 +
paimon-python/pypaimon/api/api_response.py | 8 --
paimon-python/pypaimon/catalog/catalog_factory.py | 6 +-
paimon-python/pypaimon/catalog/catalog_utils.py | 53 -------------
.../pypaimon/catalog/rest/rest_catalog.py | 64 +++++++++++++---
.../rest/rest_token.py} | 39 +++++-----
.../pypaimon/catalog/rest/rest_token_file_io.py | 48 +++++++++++-
paimon-python/pypaimon/common/file_io.py | 1 +
paimon-python/pypaimon/common/rest_json.py | 16 +++-
paimon-python/pypaimon/tests/rest_catalog_test.py | 87 ++++++++++++++++++++++
paimon-python/setup.py | 2 +-
20 files changed, 237 insertions(+), 97 deletions(-)
diff --git a/.github/workflows/e2e-tests-flink-1.x.yml
b/.github/workflows/e2e-tests-flink-1.x.yml
index 6d5b34321e..d3b91cf494 100644
--- a/.github/workflows/e2e-tests-flink-1.x.yml
+++ b/.github/workflows/e2e-tests-flink-1.x.yml
@@ -25,6 +25,7 @@ on:
- 'docs/**'
- '**/*.md'
- 'paimon-python/**'
+ - '.github/workflows/paimon-python-checks.yml'
env:
JDK_VERSION: 8
diff --git a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
index 4a70181fb4..6333f9f4b9 100644
--- a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
+++ b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
@@ -25,6 +25,7 @@ on:
- 'docs/**'
- '**/*.md'
- 'paimon-python/**'
+ - '.github/workflows/paimon-python-checks.yml'
env:
JDK_VERSION: 11
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index 9b726534ac..9a38e9b5c2 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -24,6 +24,7 @@ on:
paths:
- 'paimon-python/**'
- '!**/*.md'
+ - '.github/workflows/paimon-python-checks.yml'
env:
PYTHON_VERSION: "3.10"
diff --git a/.github/workflows/utitcase-flink-1.x.yml
b/.github/workflows/utitcase-flink-1.x.yml
index 1bf1a04ba9..4703baf58c 100644
--- a/.github/workflows/utitcase-flink-1.x.yml
+++ b/.github/workflows/utitcase-flink-1.x.yml
@@ -24,6 +24,7 @@ on:
- 'docs/**'
- '**/*.md'
- 'paimon-python/**'
+ - '.github/workflows/paimon-python-checks.yml'
env:
JDK_VERSION: 8
diff --git a/.github/workflows/utitcase-flink-2.x-jdk11.yml
b/.github/workflows/utitcase-flink-2.x-jdk11.yml
index 102a3e169e..40cd479001 100644
--- a/.github/workflows/utitcase-flink-2.x-jdk11.yml
+++ b/.github/workflows/utitcase-flink-2.x-jdk11.yml
@@ -25,6 +25,7 @@ on:
- 'docs/**'
- '**/*.md'
- 'paimon-python/**'
+ - '.github/workflows/paimon-python-checks.yml'
env:
JDK_VERSION: 11
diff --git a/.github/workflows/utitcase-jdk11.yml
b/.github/workflows/utitcase-jdk11.yml
index 1fca33908a..f2e538fe89 100644
--- a/.github/workflows/utitcase-jdk11.yml
+++ b/.github/workflows/utitcase-jdk11.yml
@@ -25,6 +25,7 @@ on:
- 'docs/**'
- '**/*.md'
- 'paimon-python/**'
+ - '.github/workflows/paimon-python-checks.yml'
env:
JDK_VERSION: 11
diff --git a/.github/workflows/utitcase-spark-3.x.yml
b/.github/workflows/utitcase-spark-3.x.yml
index 420847ad2b..5b17dc6e75 100644
--- a/.github/workflows/utitcase-spark-3.x.yml
+++ b/.github/workflows/utitcase-spark-3.x.yml
@@ -25,6 +25,7 @@ on:
- 'docs/**'
- '**/*.md'
- 'paimon-python/**'
+ - '.github/workflows/paimon-python-checks.yml'
env:
JDK_VERSION: 8
diff --git a/.github/workflows/utitcase-spark-4.x.yml
b/.github/workflows/utitcase-spark-4.x.yml
index 65fcaa6080..605d7ded84 100644
--- a/.github/workflows/utitcase-spark-4.x.yml
+++ b/.github/workflows/utitcase-spark-4.x.yml
@@ -25,6 +25,7 @@ on:
- 'docs/**'
- '**/*.md'
- 'paimon-python/**'
+ - '.github/workflows/paimon-python-checks.yml'
env:
JDK_VERSION: 17
diff --git a/.github/workflows/utitcase.yml b/.github/workflows/utitcase.yml
index aa611de918..f16f5bf32a 100644
--- a/.github/workflows/utitcase.yml
+++ b/.github/workflows/utitcase.yml
@@ -25,6 +25,7 @@ on:
- 'docs/**'
- '**/*.md'
- 'paimon-python/**'
+ - '.github/workflows/paimon-python-checks.yml'
env:
JDK_VERSION: 8
diff --git a/paimon-python/pypaimon/api/__init__.py
b/paimon-python/pypaimon/api/__init__.py
index e914335d4d..6806733c7a 100644
--- a/paimon-python/pypaimon/api/__init__.py
+++ b/paimon-python/pypaimon/api/__init__.py
@@ -126,6 +126,7 @@ class RESTApi:
PAGE_TOKEN = "pageToken"
DATABASE_NAME_PATTERN = "databaseNamePattern"
TABLE_NAME_PATTERN = "tableNamePattern"
+ TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000
def __init__(self, options: Dict[str, str], config_required: bool = True):
self.logger = logging.getLogger(self.__class__.__name__)
diff --git a/paimon-python/pypaimon/api/api_response.py
b/paimon-python/pypaimon/api/api_response.py
index 662a4eb798..17496f047e 100644
--- a/paimon-python/pypaimon/api/api_response.py
+++ b/paimon-python/pypaimon/api/api_response.py
@@ -135,14 +135,6 @@ class ListTablesResponse(PagedResponse[str]):
return self.next_page_token
-@dataclass
-class RESTToken:
- """REST authentication token"""
-
- token: Dict[str, str]
- expire_at_millis: int
-
-
@dataclass
class GetTableResponse(AuditRESTResponse):
"""Response for getting table"""
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py
b/paimon-python/pypaimon/catalog/catalog_factory.py
index c8ab4b712c..36b3e4b55a 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/catalog/catalog_factory.py
@@ -16,6 +16,8 @@
# limitations under the License.
################################################################################
from pypaimon.catalog.catalog import Catalog
+from pypaimon.api.options import Options
+from pypaimon.catalog.catalog_context import CatalogContext
from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
from pypaimon.catalog.rest.rest_catalog import RESTCatalog
from pypaimon.common.config import CatalogOptions
@@ -35,4 +37,6 @@ class CatalogFactory:
if catalog_class is None:
raise ValueError(f"Unknown catalog identifier: {identifier}. "
f"Available types:
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
- return catalog_class(catalog_options)
+ return catalog_class(
+ CatalogContext.create_from_options(Options(catalog_options))) if
identifier == "rest" else catalog_class(
+ catalog_options)
diff --git a/paimon-python/pypaimon/catalog/catalog_utils.py
b/paimon-python/pypaimon/catalog/catalog_utils.py
deleted file mode 100644
index 834aeee147..0000000000
--- a/paimon-python/pypaimon/catalog/catalog_utils.py
+++ /dev/null
@@ -1,53 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-from pathlib import Path
-from typing import Any, Callable
-
-from pypaimon.catalog.table_metadata import TableMetadata
-from pypaimon.common.core_options import CoreOptions
-from pypaimon.common.identifier import Identifier
-from pypaimon.table.catalog_environment import CatalogEnvironment
-from pypaimon.table.file_store_table import FileStoreTable
-from pypaimon.table.file_store_table_factory import FileStoreTableFactory
-
-
-class CatalogUtils:
- @staticmethod
- def load_table(
- identifier: Identifier,
- internal_file_io: Callable[[Path], Any],
- external_file_io: Callable[[Path], Any],
- metadata_loader: Callable[[Identifier], TableMetadata],
- ) -> FileStoreTable:
- metadata = metadata_loader(identifier)
- schema = metadata.schema
- data_file_io = external_file_io if metadata.is_external else
internal_file_io
- catalog_env = CatalogEnvironment(
- identifier=identifier,
- uuid=metadata.uuid,
- catalog_loader=None,
- supports_version_management=False
- )
-
- path = Path(schema.options.get(CoreOptions.PATH))
- table = FileStoreTableFactory.create(data_file_io(path), path, schema,
catalog_env)
- return table
-
- @staticmethod
- def is_system_database(database_name: str) -> bool:
- return Catalog.SYSTEM_DATABASE_NAME.equals(database_name)
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 1d32b1f43f..2a0be5b2ea 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -16,14 +16,19 @@ See the License for the specific language governing
permissions and
limitations under the License.
"""
from pathlib import Path
-from typing import Dict, List, Optional, Union
+from typing import Dict, List, Optional, Union, Callable, Any
+from urllib.parse import urlparse
+
+
+from pypaimon.api import RESTApi, CatalogOptions
+from pypaimon.api.api_response import PagedList, GetTableResponse
+
+from pypaimon.common.file_io import FileIO
-from pypaimon.api import CatalogOptions, RESTApi
-from pypaimon.api.api_response import GetTableResponse, PagedList
from pypaimon.api.options import Options
from pypaimon.catalog.catalog import Catalog
from pypaimon.catalog.catalog_context import CatalogContext
-from pypaimon.catalog.catalog_utils import CatalogUtils
+
from pypaimon.catalog.database import Database
from pypaimon.catalog.property_change import PropertyChange
from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
@@ -32,11 +37,13 @@ from pypaimon.common.core_options import CoreOptions
from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema import Schema
from pypaimon.schema.table_schema import TableSchema
+from pypaimon.table.catalog_environment import CatalogEnvironment
from pypaimon.table.file_store_table import FileStoreTable
class RESTCatalog(Catalog):
def __init__(self, context: CatalogContext, config_required:
Optional[bool] = True):
+ self.warehouse = context.options.get(CatalogOptions.WAREHOUSE)
self.api = RESTApi(context.options.to_map(), config_required)
self.context = CatalogContext.create(Options(self.api.options),
context.hadoop_conf, context.prefer_io_loader,
context.fallback_io_loader)
@@ -87,7 +94,7 @@ class RESTCatalog(Catalog):
def get_table(self, identifier: Union[str, Identifier]) -> FileStoreTable:
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
- return CatalogUtils.load_table(
+ return self.load_table(
identifier,
lambda path: self.file_io_for_data(path, identifier),
self.file_io_from_options,
@@ -95,7 +102,9 @@ class RESTCatalog(Catalog):
)
def create_table(self, identifier: Union[str, Identifier], schema: Schema,
ignore_if_exists: bool):
- raise ValueError("Not implemented")
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ self.api.create_table(identifier, schema)
def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
response = self.api.get_table(identifier)
@@ -117,8 +126,41 @@ class RESTCatalog(Catalog):
uuid=response.get_id()
)
- def file_io_from_options(self, path: Path):
- return None
-
- def file_io_for_data(self, path: Path, identifier: Identifier):
- return RESTTokenFileIO(identifier, path, None, None) if
self.data_token_enabled else None
+ def file_io_from_options(self, table_path: Path) -> FileIO:
+ return FileIO(str(table_path), self.context.options.data)
+
+ def file_io_for_data(self, table_path: Path, identifier: Identifier):
+ return RESTTokenFileIO(identifier, table_path,
self.context.options.data) \
+ if self.data_token_enabled else
self.file_io_from_options(table_path)
+
+ def load_table(self,
+ identifier: Identifier,
+ internal_file_io: Callable[[Path], Any],
+ external_file_io: Callable[[Path], Any],
+ metadata_loader: Callable[[Identifier], TableMetadata],
+ ) -> FileStoreTable:
+ metadata = metadata_loader(identifier)
+ schema = metadata.schema
+ data_file_io = external_file_io if metadata.is_external else
internal_file_io
+ catalog_env = CatalogEnvironment(
+ identifier=identifier,
+ uuid=metadata.uuid,
+ catalog_loader=None,
+ supports_version_management=False
+ )
+ path_parsed = urlparse(schema.options.get(CoreOptions.PATH))
+ path = Path(path_parsed.path) if path_parsed.scheme is None else
Path(schema.options.get(CoreOptions.PATH))
+ table = self.create(data_file_io(path),
+ Path(path_parsed.netloc + "/" + path_parsed.path),
+ schema,
+ catalog_env)
+ return table
+
+ def create(self,
+ file_io: FileIO,
+ table_path: Path,
+ table_schema: TableSchema,
+ catalog_environment: CatalogEnvironment
+ ) -> FileStoreTable:
+ """Create FileStoreTable with dynamic options and catalog
environment"""
+ return FileStoreTable(file_io, catalog_environment.identifier,
table_path, table_schema)
diff --git a/paimon-python/pypaimon/table/file_store_table_factory.py
b/paimon-python/pypaimon/catalog/rest/rest_token.py
similarity index 53%
rename from paimon-python/pypaimon/table/file_store_table_factory.py
rename to paimon-python/pypaimon/catalog/rest/rest_token.py
index 81ec64777b..716c508abc 100644
--- a/paimon-python/pypaimon/table/file_store_table_factory.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token.py
@@ -15,21 +15,24 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
-from pathlib import Path
-
-from pypaimon.common.file_io import FileIO
-from pypaimon.schema.table_schema import TableSchema
-from pypaimon.table.catalog_environment import CatalogEnvironment
-from pypaimon.table.file_store_table import FileStoreTable
-
-
-class FileStoreTableFactory:
- @staticmethod
- def create(
- file_io: FileIO,
- table_path: Path,
- table_schema: TableSchema,
- catalog_environment: CatalogEnvironment
- ) -> FileStoreTable:
- """Create FileStoreTable with dynamic options and catalog
environment"""
- return FileStoreTable(file_io, catalog_environment.identifier,
table_path, table_schema)
+from typing import Dict, Optional
+
+
+class RESTToken:
+
+ def __init__(self, token: Dict[str, str], expire_at_millis: int):
+ self.token = token
+ self.expire_at_millis = expire_at_millis
+ self.hash: Optional[int] = None
+
+ def __eq__(self, other: object) -> bool:
+ if other is None or not isinstance(other, RESTToken):
+ return False
+
+ return (self.expire_at_millis == other.expire_at_millis and
+ self.token == other.token)
+
+ def __hash__(self) -> int:
+ if self.hash is None:
+ self.hash = hash((frozenset(self.token.items()),
self.expire_at_millis))
+ return self.hash
diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index 291efdbf2d..003e9a4ef2 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -15,20 +15,60 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
+import logging
+import threading
+import time
from pathlib import Path
from typing import Optional
+from pyarrow._fs import FileSystem
+
+from pypaimon.api import RESTApi
+from pypaimon.catalog.rest.rest_token import RESTToken
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
class RESTTokenFileIO(FileIO):
- def __init__(self, identifier: Identifier, path: Path, warehouse:
Optional[str] = None,
+ def __init__(self, identifier: Identifier, path: Path,
catalog_options: Optional[dict] = None):
- super().__init__(warehouse, catalog_options)
self.identifier = identifier
self.path = path
+ self.token: Optional[RESTToken] = None
+ self.api_instance: Optional[RESTApi] = None
+ self.lock = threading.Lock()
+ self.log = logging.getLogger(__name__)
+ super().__init__(str(path), catalog_options)
+
+ def _initialize_oss_fs(self) -> FileSystem:
+ self.try_to_refresh_token()
+ self.properties.update(self.token.token)
+ return super()._initialize_oss_fs()
+
+ def try_to_refresh_token(self):
+ if self.should_refresh():
+ with self.lock:
+ if self.should_refresh():
+ self.refresh_token()
+
+ def should_refresh(self):
+ if self.token is None:
+ return True
+ current_time = int(time.time() * 1000)
+ return (self.token.expire_at_millis - current_time) <
RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS
+
+ def refresh_token(self):
+ self.log.info(f"begin refresh data token for identifier
[{self.identifier}]")
+ if self.api_instance is None:
+ self.api_instance = RESTApi(self.properties, False)
+
+ response = self.api_instance.load_table_token(self.identifier)
+ self.log.info(
+ f"end refresh data token for identifier [{self.identifier}]
expiresAtMillis [{response.expires_at_millis}]"
+ )
+ self.token = RESTToken(response.token, response.expires_at_millis)
- def exists(self, path: Path) -> bool:
- pass
+ def valid_token(self):
+ self.try_to_refresh_token()
+ return self.token
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index f1b40e2239..50ecae96cb 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -26,6 +26,7 @@ from urllib.parse import splitport, urlparse
import pyarrow
from pyarrow._fs import FileSystem
+
from pypaimon.common.config import OssOptions, S3Options
from pypaimon.schema.data_types import PyarrowFieldParser
diff --git a/paimon-python/pypaimon/common/rest_json.py
b/paimon-python/pypaimon/common/rest_json.py
index 6448169d72..aa959e1c53 100644
--- a/paimon-python/pypaimon/common/rest_json.py
+++ b/paimon-python/pypaimon/common/rest_json.py
@@ -81,6 +81,8 @@ class JSON:
field_type = args[0]
if is_dataclass(field_type):
type_mapping[json_name] = field_type
+ elif origin_type is list and is_dataclass(args[0]):
+ type_mapping[json_name] = field_info.type
# Map JSON data to field names
kwargs = {}
@@ -88,7 +90,19 @@ class JSON:
if json_name in field_mapping:
field_name = field_mapping[json_name]
if json_name in type_mapping:
- kwargs[field_name] = JSON.__from_dict(value,
type_mapping[json_name])
+ tp = get_origin(type_mapping[json_name])
+ if tp is list:
+ item_type = get_args(type_mapping[json_name])[0]
+ if is_dataclass(item_type):
+ kwargs[field_name] = [
+ item_type.from_dict(item)
+ if hasattr(item_type, "to_dict")
+ else JSON.__from_dict(item, item_type)
+ for item in value]
+ else:
+ kwargs[field_name] = value
+ else:
+ kwargs[field_name] = JSON.__from_dict(value,
type_mapping[json_name])
else:
kwargs[field_name] = value
diff --git a/paimon-python/pypaimon/tests/rest_catalog_test.py
b/paimon-python/pypaimon/tests/rest_catalog_test.py
index 8200fcc658..341d463c59 100644
--- a/paimon-python/pypaimon/tests/rest_catalog_test.py
+++ b/paimon-python/pypaimon/tests/rest_catalog_test.py
@@ -15,23 +15,38 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
+import glob
import logging
+import os
+import shutil
+import tempfile
import unittest
import uuid
+import pyarrow as pa
+
+
from pypaimon.api import ConfigResponse, Identifier
from pypaimon.api.auth import BearTokenAuthProvider
from pypaimon.api.options import Options
from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.catalog.rest.rest_catalog import RESTCatalog
from pypaimon.catalog.table_metadata import TableMetadata
from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
MapType)
+from pypaimon.schema.schema import Schema
from pypaimon.schema.table_schema import TableSchema
from pypaimon.tests.rest_server import RESTCatalogServer
class RESTCatalogTestCase(unittest.TestCase):
+ def setUp(self):
+ self.temp_dir = tempfile.mkdtemp(prefix="unittest_")
+ self.tmp_path = os.path.join(self.temp_dir, 'test_dir')
+
+ def tearDown(self):
+ shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_rest_catalog(self):
"""Example usage of RESTCatalogServer"""
@@ -88,3 +103,75 @@ class RESTCatalogTestCase(unittest.TestCase):
# Shutdown server
server.shutdown()
print("Server stopped")
+
+ def test_write(self):
+ """Example usage of RESTCatalogServer"""
+ # Setup logging
+ logging.basicConfig(level=logging.INFO)
+
+ # Create config
+ config = ConfigResponse(defaults={"prefix": "mock-test"})
+ token = str(uuid.uuid4())
+ warehouse = "test_warehouse"
+ # Create server
+ server = RESTCatalogServer(
+ data_path=self.tmp_path,
+ auth_provider=BearTokenAuthProvider(token),
+ config=config,
+ warehouse=warehouse
+ )
+ try:
+ # Start server
+ server.start()
+ print(f"Server started at: {server.get_url()}")
+
+ pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string())
+ ])
+ fields = [
+ DataField.from_dict({"id": 1, "name": "f0", "type": "INT"}),
+ DataField.from_dict({"id": 2, "name": "f1", "type": "STRING"}),
+ DataField.from_dict({"id": 3, "name": "f2", "type": "STRING"}),
+ ]
+ options = {
+ 'metastore': 'rest',
+ 'uri': f"http://localhost:{server.port}",
+ 'warehouse': 'test_warehouse',
+ 'dlf.region': 'cn-hangzhou',
+ "token.provider": "bear",
+ 'token': token,
+
+ }
+ catalog = CatalogFactory.create(options)
+ catalog.create_database("test_db", False)
+ catalog.create_table("test_db.test_table", Schema(fields=fields),
False)
+ table = catalog.get_table("test_db.test_table")
+
+ data = {
+ 'f0': [1, 2, 3],
+ 'f1': ['a', 'b', 'c'],
+ 'f2': ['X', 'Y', 'Z']
+ }
+ expect = pa.Table.from_pydict(data, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(expect)
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ self.assertTrue(os.path.exists(self.tmp_path +
"/test_warehouse/test_db/test_table/snapshot/LATEST"))
+ self.assertTrue(os.path.exists(self.tmp_path +
"/test_warehouse/test_db/test_table/snapshot/snapshot-1"))
+ self.assertTrue(os.path.exists(self.tmp_path +
"/test_warehouse/test_db/test_table/manifest"))
+ self.assertTrue(os.path.exists(self.tmp_path +
"/test_warehouse/test_db/test_table/bucket-0"))
+ self.assertEqual(len(glob.glob(self.tmp_path +
"/test_warehouse/test_db/test_table/manifest/*.avro")), 2)
+ self.assertEqual(len(glob.glob(self.tmp_path +
"/test_warehouse/test_db/test_table/bucket-0/*.parquet")), 1)
+ finally:
+ # Shutdown server
+ server.shutdown()
+ print("Server stopped")
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 29e4c559d1..507844685c 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -28,7 +28,7 @@ install_requires = [
'ossfs==2023.12.0',
'pyarrow==16.0.0',
'polars==1.32.0',
- 'fastavro==1.11.1',
+ 'fastavro==1.11.1'
]
long_description = "See Apache Paimon Python API \