This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ca2e59928 [Python] Support write table for RESTCatalog (#6030)
0ca2e59928 is described below

commit 0ca2e5992862cf0c6516b24ab620e876a6e896ab
Author: HeavenZH <[email protected]>
AuthorDate: Thu Aug 7 10:31:31 2025 +0800

    [Python] Support write table for RESTCatalog (#6030)
---
 .github/workflows/e2e-tests-flink-1.x.yml          |  1 +
 .github/workflows/e2e-tests-flink-2.x-jdk11.yml    |  1 +
 .github/workflows/paimon-python-checks.yml         |  1 +
 .github/workflows/utitcase-flink-1.x.yml           |  1 +
 .github/workflows/utitcase-flink-2.x-jdk11.yml     |  1 +
 .github/workflows/utitcase-jdk11.yml               |  1 +
 .github/workflows/utitcase-spark-3.x.yml           |  1 +
 .github/workflows/utitcase-spark-4.x.yml           |  1 +
 .github/workflows/utitcase.yml                     |  1 +
 paimon-python/pypaimon/api/__init__.py             |  1 +
 paimon-python/pypaimon/api/api_response.py         |  8 --
 paimon-python/pypaimon/catalog/catalog_factory.py  |  6 +-
 paimon-python/pypaimon/catalog/catalog_utils.py    | 53 -------------
 .../pypaimon/catalog/rest/rest_catalog.py          | 64 +++++++++++++---
 .../rest/rest_token.py}                            | 39 +++++-----
 .../pypaimon/catalog/rest/rest_token_file_io.py    | 48 +++++++++++-
 paimon-python/pypaimon/common/file_io.py           |  1 +
 paimon-python/pypaimon/common/rest_json.py         | 16 +++-
 paimon-python/pypaimon/tests/rest_catalog_test.py  | 87 ++++++++++++++++++++++
 paimon-python/setup.py                             |  2 +-
 20 files changed, 237 insertions(+), 97 deletions(-)

diff --git a/.github/workflows/e2e-tests-flink-1.x.yml 
b/.github/workflows/e2e-tests-flink-1.x.yml
index 6d5b34321e..d3b91cf494 100644
--- a/.github/workflows/e2e-tests-flink-1.x.yml
+++ b/.github/workflows/e2e-tests-flink-1.x.yml
@@ -25,6 +25,7 @@ on:
       - 'docs/**'
       - '**/*.md'
       - 'paimon-python/**'
+      - '.github/workflows/paimon-python-checks.yml'
 
 env:
   JDK_VERSION: 8
diff --git a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml 
b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
index 4a70181fb4..6333f9f4b9 100644
--- a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
+++ b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
@@ -25,6 +25,7 @@ on:
       - 'docs/**'
       - '**/*.md'
       - 'paimon-python/**'
+      - '.github/workflows/paimon-python-checks.yml'
 
 env:
   JDK_VERSION: 11
diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index 9b726534ac..9a38e9b5c2 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -24,6 +24,7 @@ on:
     paths:
       - 'paimon-python/**'
       - '!**/*.md'
+      - '.github/workflows/paimon-python-checks.yml'
 
 env:
   PYTHON_VERSION: "3.10"
diff --git a/.github/workflows/utitcase-flink-1.x.yml 
b/.github/workflows/utitcase-flink-1.x.yml
index 1bf1a04ba9..4703baf58c 100644
--- a/.github/workflows/utitcase-flink-1.x.yml
+++ b/.github/workflows/utitcase-flink-1.x.yml
@@ -24,6 +24,7 @@ on:
       - 'docs/**'
       - '**/*.md'
       - 'paimon-python/**'
+      - '.github/workflows/paimon-python-checks.yml'
 
 env:
   JDK_VERSION: 8
diff --git a/.github/workflows/utitcase-flink-2.x-jdk11.yml 
b/.github/workflows/utitcase-flink-2.x-jdk11.yml
index 102a3e169e..40cd479001 100644
--- a/.github/workflows/utitcase-flink-2.x-jdk11.yml
+++ b/.github/workflows/utitcase-flink-2.x-jdk11.yml
@@ -25,6 +25,7 @@ on:
       - 'docs/**'
       - '**/*.md'
       - 'paimon-python/**'
+      - '.github/workflows/paimon-python-checks.yml'
 
 env:
   JDK_VERSION: 11
diff --git a/.github/workflows/utitcase-jdk11.yml 
b/.github/workflows/utitcase-jdk11.yml
index 1fca33908a..f2e538fe89 100644
--- a/.github/workflows/utitcase-jdk11.yml
+++ b/.github/workflows/utitcase-jdk11.yml
@@ -25,6 +25,7 @@ on:
       - 'docs/**'
       - '**/*.md'
       - 'paimon-python/**'
+      - '.github/workflows/paimon-python-checks.yml'
 
 env:
   JDK_VERSION: 11
diff --git a/.github/workflows/utitcase-spark-3.x.yml 
b/.github/workflows/utitcase-spark-3.x.yml
index 420847ad2b..5b17dc6e75 100644
--- a/.github/workflows/utitcase-spark-3.x.yml
+++ b/.github/workflows/utitcase-spark-3.x.yml
@@ -25,6 +25,7 @@ on:
       - 'docs/**'
       - '**/*.md'
       - 'paimon-python/**'
+      - '.github/workflows/paimon-python-checks.yml'
 
 env:
   JDK_VERSION: 8
diff --git a/.github/workflows/utitcase-spark-4.x.yml 
b/.github/workflows/utitcase-spark-4.x.yml
index 65fcaa6080..605d7ded84 100644
--- a/.github/workflows/utitcase-spark-4.x.yml
+++ b/.github/workflows/utitcase-spark-4.x.yml
@@ -25,6 +25,7 @@ on:
       - 'docs/**'
       - '**/*.md'
       - 'paimon-python/**'
+      - '.github/workflows/paimon-python-checks.yml'
 
 env:
   JDK_VERSION: 17
diff --git a/.github/workflows/utitcase.yml b/.github/workflows/utitcase.yml
index aa611de918..f16f5bf32a 100644
--- a/.github/workflows/utitcase.yml
+++ b/.github/workflows/utitcase.yml
@@ -25,6 +25,7 @@ on:
       - 'docs/**'
       - '**/*.md'
       - 'paimon-python/**'
+      - '.github/workflows/paimon-python-checks.yml'
 
 env:
   JDK_VERSION: 8
diff --git a/paimon-python/pypaimon/api/__init__.py 
b/paimon-python/pypaimon/api/__init__.py
index e914335d4d..6806733c7a 100644
--- a/paimon-python/pypaimon/api/__init__.py
+++ b/paimon-python/pypaimon/api/__init__.py
@@ -126,6 +126,7 @@ class RESTApi:
     PAGE_TOKEN = "pageToken"
     DATABASE_NAME_PATTERN = "databaseNamePattern"
     TABLE_NAME_PATTERN = "tableNamePattern"
+    TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000
 
     def __init__(self, options: Dict[str, str], config_required: bool = True):
         self.logger = logging.getLogger(self.__class__.__name__)
diff --git a/paimon-python/pypaimon/api/api_response.py 
b/paimon-python/pypaimon/api/api_response.py
index 662a4eb798..17496f047e 100644
--- a/paimon-python/pypaimon/api/api_response.py
+++ b/paimon-python/pypaimon/api/api_response.py
@@ -135,14 +135,6 @@ class ListTablesResponse(PagedResponse[str]):
         return self.next_page_token
 
 
-@dataclass
-class RESTToken:
-    """REST authentication token"""
-
-    token: Dict[str, str]
-    expire_at_millis: int
-
-
 @dataclass
 class GetTableResponse(AuditRESTResponse):
     """Response for getting table"""
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py 
b/paimon-python/pypaimon/catalog/catalog_factory.py
index c8ab4b712c..36b3e4b55a 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/catalog/catalog_factory.py
@@ -16,6 +16,8 @@
 # limitations under the License.
 
################################################################################
 from pypaimon.catalog.catalog import Catalog
+from pypaimon.api.options import Options
+from pypaimon.catalog.catalog_context import CatalogContext
 from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
 from pypaimon.catalog.rest.rest_catalog import RESTCatalog
 from pypaimon.common.config import CatalogOptions
@@ -35,4 +37,6 @@ class CatalogFactory:
         if catalog_class is None:
             raise ValueError(f"Unknown catalog identifier: {identifier}. "
                              f"Available types: 
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
-        return catalog_class(catalog_options)
+        return catalog_class(
+            CatalogContext.create_from_options(Options(catalog_options))) if 
identifier == "rest" else catalog_class(
+            catalog_options)
diff --git a/paimon-python/pypaimon/catalog/catalog_utils.py 
b/paimon-python/pypaimon/catalog/catalog_utils.py
deleted file mode 100644
index 834aeee147..0000000000
--- a/paimon-python/pypaimon/catalog/catalog_utils.py
+++ /dev/null
@@ -1,53 +0,0 @@
-"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-"""
-from pathlib import Path
-from typing import Any, Callable
-
-from pypaimon.catalog.table_metadata import TableMetadata
-from pypaimon.common.core_options import CoreOptions
-from pypaimon.common.identifier import Identifier
-from pypaimon.table.catalog_environment import CatalogEnvironment
-from pypaimon.table.file_store_table import FileStoreTable
-from pypaimon.table.file_store_table_factory import FileStoreTableFactory
-
-
-class CatalogUtils:
-    @staticmethod
-    def load_table(
-            identifier: Identifier,
-            internal_file_io: Callable[[Path], Any],
-            external_file_io: Callable[[Path], Any],
-            metadata_loader: Callable[[Identifier], TableMetadata],
-    ) -> FileStoreTable:
-        metadata = metadata_loader(identifier)
-        schema = metadata.schema
-        data_file_io = external_file_io if metadata.is_external else 
internal_file_io
-        catalog_env = CatalogEnvironment(
-            identifier=identifier,
-            uuid=metadata.uuid,
-            catalog_loader=None,
-            supports_version_management=False
-        )
-
-        path = Path(schema.options.get(CoreOptions.PATH))
-        table = FileStoreTableFactory.create(data_file_io(path), path, schema, 
catalog_env)
-        return table
-
-    @staticmethod
-    def is_system_database(database_name: str) -> bool:
-        return Catalog.SYSTEM_DATABASE_NAME.equals(database_name)
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 1d32b1f43f..2a0be5b2ea 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -16,14 +16,19 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 """
 from pathlib import Path
-from typing import Dict, List, Optional, Union
+from typing import Dict, List, Optional, Union, Callable, Any
+from urllib.parse import urlparse
+
+
+from pypaimon.api import RESTApi, CatalogOptions
+from pypaimon.api.api_response import PagedList, GetTableResponse
+
+from pypaimon.common.file_io import FileIO
 
-from pypaimon.api import CatalogOptions, RESTApi
-from pypaimon.api.api_response import GetTableResponse, PagedList
 from pypaimon.api.options import Options
 from pypaimon.catalog.catalog import Catalog
 from pypaimon.catalog.catalog_context import CatalogContext
-from pypaimon.catalog.catalog_utils import CatalogUtils
+
 from pypaimon.catalog.database import Database
 from pypaimon.catalog.property_change import PropertyChange
 from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
@@ -32,11 +37,13 @@ from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.identifier import Identifier
 from pypaimon.schema.schema import Schema
 from pypaimon.schema.table_schema import TableSchema
+from pypaimon.table.catalog_environment import CatalogEnvironment
 from pypaimon.table.file_store_table import FileStoreTable
 
 
 class RESTCatalog(Catalog):
     def __init__(self, context: CatalogContext, config_required: 
Optional[bool] = True):
+        self.warehouse = context.options.get(CatalogOptions.WAREHOUSE)
         self.api = RESTApi(context.options.to_map(), config_required)
         self.context = CatalogContext.create(Options(self.api.options), 
context.hadoop_conf, context.prefer_io_loader,
                                              context.fallback_io_loader)
@@ -87,7 +94,7 @@ class RESTCatalog(Catalog):
     def get_table(self, identifier: Union[str, Identifier]) -> FileStoreTable:
         if not isinstance(identifier, Identifier):
             identifier = Identifier.from_string(identifier)
-        return CatalogUtils.load_table(
+        return self.load_table(
             identifier,
             lambda path: self.file_io_for_data(path, identifier),
             self.file_io_from_options,
@@ -95,7 +102,9 @@ class RESTCatalog(Catalog):
         )
 
     def create_table(self, identifier: Union[str, Identifier], schema: Schema, 
ignore_if_exists: bool):
-        raise ValueError("Not implemented")
+        if not isinstance(identifier, Identifier):
+            identifier = Identifier.from_string(identifier)
+        self.api.create_table(identifier, schema)
 
     def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
         response = self.api.get_table(identifier)
@@ -117,8 +126,41 @@ class RESTCatalog(Catalog):
             uuid=response.get_id()
         )
 
-    def file_io_from_options(self, path: Path):
-        return None
-
-    def file_io_for_data(self, path: Path, identifier: Identifier):
-        return RESTTokenFileIO(identifier, path, None, None) if 
self.data_token_enabled else None
+    def file_io_from_options(self, table_path: Path) -> FileIO:
+        return FileIO(str(table_path), self.context.options.data)
+
+    def file_io_for_data(self, table_path: Path, identifier: Identifier):
+        return RESTTokenFileIO(identifier, table_path, 
self.context.options.data) \
+            if self.data_token_enabled else 
self.file_io_from_options(table_path)
+
+    def load_table(self,
+                   identifier: Identifier,
+                   internal_file_io: Callable[[Path], Any],
+                   external_file_io: Callable[[Path], Any],
+                   metadata_loader: Callable[[Identifier], TableMetadata],
+                   ) -> FileStoreTable:
+        metadata = metadata_loader(identifier)
+        schema = metadata.schema
+        data_file_io = external_file_io if metadata.is_external else 
internal_file_io
+        catalog_env = CatalogEnvironment(
+            identifier=identifier,
+            uuid=metadata.uuid,
+            catalog_loader=None,
+            supports_version_management=False
+        )
+        path_parsed = urlparse(schema.options.get(CoreOptions.PATH))
+        path = Path(path_parsed.path) if path_parsed.scheme is None else 
Path(schema.options.get(CoreOptions.PATH))
+        table = self.create(data_file_io(path),
+                            Path(path_parsed.netloc + "/" + path_parsed.path),
+                            schema,
+                            catalog_env)
+        return table
+
+    def create(self,
+               file_io: FileIO,
+               table_path: Path,
+               table_schema: TableSchema,
+               catalog_environment: CatalogEnvironment
+               ) -> FileStoreTable:
+        """Create FileStoreTable with dynamic options and catalog 
environment"""
+        return FileStoreTable(file_io, catalog_environment.identifier, 
table_path, table_schema)
diff --git a/paimon-python/pypaimon/table/file_store_table_factory.py 
b/paimon-python/pypaimon/catalog/rest/rest_token.py
similarity index 53%
rename from paimon-python/pypaimon/table/file_store_table_factory.py
rename to paimon-python/pypaimon/catalog/rest/rest_token.py
index 81ec64777b..716c508abc 100644
--- a/paimon-python/pypaimon/table/file_store_table_factory.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token.py
@@ -15,21 +15,24 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
-from pathlib import Path
-
-from pypaimon.common.file_io import FileIO
-from pypaimon.schema.table_schema import TableSchema
-from pypaimon.table.catalog_environment import CatalogEnvironment
-from pypaimon.table.file_store_table import FileStoreTable
-
-
-class FileStoreTableFactory:
-    @staticmethod
-    def create(
-            file_io: FileIO,
-            table_path: Path,
-            table_schema: TableSchema,
-            catalog_environment: CatalogEnvironment
-    ) -> FileStoreTable:
-        """Create FileStoreTable with dynamic options and catalog 
environment"""
-        return FileStoreTable(file_io, catalog_environment.identifier, 
table_path, table_schema)
+from typing import Dict, Optional
+
+
+class RESTToken:
+
+    def __init__(self, token: Dict[str, str], expire_at_millis: int):
+        self.token = token
+        self.expire_at_millis = expire_at_millis
+        self.hash: Optional[int] = None
+
+    def __eq__(self, other: object) -> bool:
+        if other is None or not isinstance(other, RESTToken):
+            return False
+
+        return (self.expire_at_millis == other.expire_at_millis and
+                self.token == other.token)
+
+    def __hash__(self) -> int:
+        if self.hash is None:
+            self.hash = hash((frozenset(self.token.items()), 
self.expire_at_millis))
+        return self.hash
diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py 
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index 291efdbf2d..003e9a4ef2 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -15,20 +15,60 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
+import logging
+import threading
+import time
 from pathlib import Path
 from typing import Optional
 
+from pyarrow._fs import FileSystem
+
+from pypaimon.api import RESTApi
+from pypaimon.catalog.rest.rest_token import RESTToken
 from pypaimon.common.file_io import FileIO
 from pypaimon.common.identifier import Identifier
 
 
 class RESTTokenFileIO(FileIO):
 
-    def __init__(self, identifier: Identifier, path: Path, warehouse: 
Optional[str] = None,
+    def __init__(self, identifier: Identifier, path: Path,
                  catalog_options: Optional[dict] = None):
-        super().__init__(warehouse, catalog_options)
         self.identifier = identifier
         self.path = path
+        self.token: Optional[RESTToken] = None
+        self.api_instance: Optional[RESTApi] = None
+        self.lock = threading.Lock()
+        self.log = logging.getLogger(__name__)
+        super().__init__(str(path), catalog_options)
+
+    def _initialize_oss_fs(self) -> FileSystem:
+        self.try_to_refresh_token()
+        self.properties.update(self.token.token)
+        return super()._initialize_oss_fs()
+
+    def try_to_refresh_token(self):
+        if self.should_refresh():
+            with self.lock:
+                if self.should_refresh():
+                    self.refresh_token()
+
+    def should_refresh(self):
+        if self.token is None:
+            return True
+        current_time = int(time.time() * 1000)
+        return (self.token.expire_at_millis - current_time) < 
RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS
+
+    def refresh_token(self):
+        self.log.info(f"begin refresh data token for identifier 
[{self.identifier}]")
+        if self.api_instance is None:
+            self.api_instance = RESTApi(self.properties, False)
+
+        response = self.api_instance.load_table_token(self.identifier)
+        self.log.info(
+            f"end refresh data token for identifier [{self.identifier}] 
expiresAtMillis [{response.expires_at_millis}]"
+        )
+        self.token = RESTToken(response.token, response.expires_at_millis)
 
-    def exists(self, path: Path) -> bool:
-        pass
+    def valid_token(self):
+        self.try_to_refresh_token()
+        return self.token
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index f1b40e2239..50ecae96cb 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -26,6 +26,7 @@ from urllib.parse import splitport, urlparse
 import pyarrow
 from pyarrow._fs import FileSystem
 
+
 from pypaimon.common.config import OssOptions, S3Options
 from pypaimon.schema.data_types import PyarrowFieldParser
 
diff --git a/paimon-python/pypaimon/common/rest_json.py 
b/paimon-python/pypaimon/common/rest_json.py
index 6448169d72..aa959e1c53 100644
--- a/paimon-python/pypaimon/common/rest_json.py
+++ b/paimon-python/pypaimon/common/rest_json.py
@@ -81,6 +81,8 @@ class JSON:
                 field_type = args[0]
             if is_dataclass(field_type):
                 type_mapping[json_name] = field_type
+            elif origin_type is list and is_dataclass(args[0]):
+                type_mapping[json_name] = field_info.type
 
         # Map JSON data to field names
         kwargs = {}
@@ -88,7 +90,19 @@ class JSON:
             if json_name in field_mapping:
                 field_name = field_mapping[json_name]
                 if json_name in type_mapping:
-                    kwargs[field_name] = JSON.__from_dict(value, 
type_mapping[json_name])
+                    tp = get_origin(type_mapping[json_name])
+                    if tp is list:
+                        item_type = get_args(type_mapping[json_name])[0]
+                        if is_dataclass(item_type):
+                            kwargs[field_name] = [
+                                item_type.from_dict(item)
+                                if hasattr(item_type, "to_dict")
+                                else JSON.__from_dict(item, item_type)
+                                for item in value]
+                        else:
+                            kwargs[field_name] = value
+                    else:
+                        kwargs[field_name] = JSON.__from_dict(value, 
type_mapping[json_name])
                 else:
                     kwargs[field_name] = value
 
diff --git a/paimon-python/pypaimon/tests/rest_catalog_test.py 
b/paimon-python/pypaimon/tests/rest_catalog_test.py
index 8200fcc658..341d463c59 100644
--- a/paimon-python/pypaimon/tests/rest_catalog_test.py
+++ b/paimon-python/pypaimon/tests/rest_catalog_test.py
@@ -15,23 +15,38 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
+import glob
 import logging
+import os
+import shutil
+import tempfile
 import unittest
 import uuid
 
+import pyarrow as pa
+
+
 from pypaimon.api import ConfigResponse, Identifier
 from pypaimon.api.auth import BearTokenAuthProvider
 from pypaimon.api.options import Options
 from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_factory import CatalogFactory
 from pypaimon.catalog.rest.rest_catalog import RESTCatalog
 from pypaimon.catalog.table_metadata import TableMetadata
 from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
                                         MapType)
+from pypaimon.schema.schema import Schema
 from pypaimon.schema.table_schema import TableSchema
 from pypaimon.tests.rest_server import RESTCatalogServer
 
 
 class RESTCatalogTestCase(unittest.TestCase):
+    def setUp(self):
+        self.temp_dir = tempfile.mkdtemp(prefix="unittest_")
+        self.tmp_path = os.path.join(self.temp_dir, 'test_dir')
+
+    def tearDown(self):
+        shutil.rmtree(self.temp_dir, ignore_errors=True)
 
     def test_rest_catalog(self):
         """Example usage of RESTCatalogServer"""
@@ -88,3 +103,75 @@ class RESTCatalogTestCase(unittest.TestCase):
             # Shutdown server
             server.shutdown()
             print("Server stopped")
+
+    def test_write(self):
+        """Example usage of RESTCatalogServer"""
+        # Setup logging
+        logging.basicConfig(level=logging.INFO)
+
+        # Create config
+        config = ConfigResponse(defaults={"prefix": "mock-test"})
+        token = str(uuid.uuid4())
+        warehouse = "test_warehouse"
+        # Create server
+        server = RESTCatalogServer(
+            data_path=self.tmp_path,
+            auth_provider=BearTokenAuthProvider(token),
+            config=config,
+            warehouse=warehouse
+        )
+        try:
+            # Start server
+            server.start()
+            print(f"Server started at: {server.get_url()}")
+
+            pa_schema = pa.schema([
+                ('f0', pa.int32()),
+                ('f1', pa.string()),
+                ('f2', pa.string())
+            ])
+            fields = [
+                DataField.from_dict({"id": 1, "name": "f0", "type": "INT"}),
+                DataField.from_dict({"id": 2, "name": "f1", "type": "STRING"}),
+                DataField.from_dict({"id": 3, "name": "f2", "type": "STRING"}),
+            ]
+            options = {
+                'metastore': 'rest',
+                'uri': f"http://localhost:{server.port}";,
+                'warehouse': 'test_warehouse',
+                'dlf.region': 'cn-hangzhou',
+                "token.provider": "bear",
+                'token': token,
+
+            }
+            catalog = CatalogFactory.create(options)
+            catalog.create_database("test_db", False)
+            catalog.create_table("test_db.test_table", Schema(fields=fields), 
False)
+            table = catalog.get_table("test_db.test_table")
+
+            data = {
+                'f0': [1, 2, 3],
+                'f1': ['a', 'b', 'c'],
+                'f2': ['X', 'Y', 'Z']
+            }
+            expect = pa.Table.from_pydict(data, schema=pa_schema)
+
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            table_write.write_arrow(expect)
+            commit_messages = table_write.prepare_commit()
+            table_commit.commit(commit_messages)
+            table_write.close()
+            table_commit.close()
+
+            self.assertTrue(os.path.exists(self.tmp_path + 
"/test_warehouse/test_db/test_table/snapshot/LATEST"))
+            self.assertTrue(os.path.exists(self.tmp_path + 
"/test_warehouse/test_db/test_table/snapshot/snapshot-1"))
+            self.assertTrue(os.path.exists(self.tmp_path + 
"/test_warehouse/test_db/test_table/manifest"))
+            self.assertTrue(os.path.exists(self.tmp_path + 
"/test_warehouse/test_db/test_table/bucket-0"))
+            self.assertEqual(len(glob.glob(self.tmp_path + 
"/test_warehouse/test_db/test_table/manifest/*.avro")), 2)
+            self.assertEqual(len(glob.glob(self.tmp_path + 
"/test_warehouse/test_db/test_table/bucket-0/*.parquet")), 1)
+        finally:
+            # Shutdown server
+            server.shutdown()
+            print("Server stopped")
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 29e4c559d1..507844685c 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -28,7 +28,7 @@ install_requires = [
     'ossfs==2023.12.0',
     'pyarrow==16.0.0',
     'polars==1.32.0',
-    'fastavro==1.11.1',
+    'fastavro==1.11.1'
 ]
 
 long_description = "See Apache Paimon Python API \

Reply via email to