This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 568c008cdb [python] api: Support python RESTApi (#5873)
568c008cdb is described below

commit 568c008cdb2ad2b971710f3b1a7078607836ba44
Author: jerry <[email protected]>
AuthorDate: Fri Jul 11 15:20:23 2025 +0800

    [python] api: Support python RESTApi (#5873)
---
 .gitignore                 |   1 +
 python/api/__init__.py     | 264 ++++++++++++++++++++++++++++++++++++
 python/api/api_response.py | 301 +++++++++++++++++++++++++++++++++++++++++
 python/api/api_resquest.py |  43 ++++++
 python/api/auth.py         | 284 +++++++++++++++++++++++++++++++++++++++
 python/api/client.py       | 324 +++++++++++++++++++++++++++++++++++++++++++++
 6 files changed, 1217 insertions(+)

diff --git a/.gitignore b/.gitignore
index 7b1aaba72b..8d59f6faa3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,6 +19,7 @@ target
 .java-version
 dependency-reduced-pom.xml
 metastore_db/
+python/.idea/
 
 ### VS Code ###
 .vscode/
diff --git a/python/api/__init__.py b/python/api/__init__.py
new file mode 100644
index 0000000000..29c50f85f2
--- /dev/null
+++ b/python/api/__init__.py
@@ -0,0 +1,264 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import logging
+from typing import Dict, List, Optional, Callable
+from auth import RESTAuthFunction
+from api_response import PagedList, GetTableResponse, ListDatabasesResponse, 
ListTablesResponse, GetDatabaseResponse, \
+    ConfigResponse, PagedResponse, T
+from api_resquest import Identifier, CreateDatabaseRequest
+
+
+class RESTCatalogOptions:
+
+    URI = "uri"
+    WAREHOUSE = "warehouse"
+    TOKEN_PROVIDER = "token.provider"
+    DLF_REGION = "dlf.region"
+    DLF_ACCESS_KEY_ID = "dlf.access-key-id"
+    DLF_ACCESS_KEY_SECRET = "dlf.access-key-secret"
+    PREFIX = 'prefix'
+
+
+class RESTException(Exception):
+    pass
+
+
+class AlreadyExistsException(RESTException):
+    pass
+
+
+class ForbiddenException(RESTException):
+    pass
+
+
+class NoSuchResourceException(RESTException):
+    pass
+
+
+class RESTUtil:
+    @staticmethod
+    def encode_string(value: str) -> str:
+        import urllib.parse
+        return urllib.parse.quote(value)
+
+    @staticmethod
+    def extract_prefix_map(options: Dict[str, str], prefix: str) -> Dict[str, 
str]:
+        result = {}
+        config = options
+        for key, value in config.items():
+            if key.startswith(prefix):
+                new_key = key[len(prefix):]
+                result[new_key] = str(value)
+        return result
+
+
+class ResourcePaths:
+
+    V1 = "v1"
+    DATABASES = "databases"
+    TABLES = "tables"
+    TABLE_DETAILS = "table-details"
+
+    def __init__(self, base_path: str = ""):
+        self.base_path = base_path.rstrip('/')
+
+    @classmethod
+    def for_catalog_properties(cls, options: dict[str, str]) -> 
'ResourcePaths':
+        prefix = options.get(RESTCatalogOptions.PREFIX, "")
+        return cls(f"/{cls.V1}/{prefix}" if prefix else f"/{cls.V1}")
+
+    def config(self) -> str:
+        return f"/{self.V1}/config"
+
+    def databases(self) -> str:
+        return f"{self.base_path}/{self.DATABASES}"
+
+    def database(self, name: str) -> str:
+        return f"{self.base_path}/{self.DATABASES}/{name}"
+
+    def tables(self, database_name: Optional[str] = None) -> str:
+        if database_name:
+            return 
f"{self.base_path}/{self.DATABASES}/{database_name}/{self.TABLES}"
+        return f"{self.base_path}/{self.TABLES}"
+
+    def table(self, database_name: str, table_name: str) -> str:
+        return 
f"{self.base_path}/{self.DATABASES}/{database_name}/{self.TABLES}/{table_name}"
+
+    def table_details(self, database_name: str) -> str:
+        return 
f"{self.base_path}/{self.DATABASES}/{database_name}/{self.TABLE_DETAILS}"
+
+
+class RESTApi:
+
+    HEADER_PREFIX = "header."
+    MAX_RESULTS = "maxResults"
+    PAGE_TOKEN = "pageToken"
+    DATABASE_NAME_PATTERN = "databaseNamePattern"
+    TABLE_NAME_PATTERN = "tableNamePattern"
+
+    def __init__(self, options: Dict[str, str], config_required: bool = True):
+        self.logger = logging.getLogger(self.__class__.__name__)
+
+        from client import HttpClient
+        from auth import DLFAuthProvider, DLFToken
+
+        self.client = HttpClient(options.get(RESTCatalogOptions.URI))
+        auth_provider = DLFAuthProvider(
+            DLFToken(options), options.get(RESTCatalogOptions.DLF_REGION)
+        )
+        base_headers = RESTUtil.extract_prefix_map(options, self.HEADER_PREFIX)
+
+        if config_required:
+            warehouse = options.get(RESTCatalogOptions.WAREHOUSE)
+            query_params = {}
+            if warehouse:
+                query_params[RESTCatalogOptions.WAREHOUSE] = 
RESTUtil.encode_string(warehouse)
+
+            config_response = self.client.get_with_params(
+                ResourcePaths().config(),
+                query_params,
+                ConfigResponse,
+                RESTAuthFunction({}, auth_provider)
+            )
+            options = config_response.merge(options)
+            base_headers.update(RESTUtil.extract_prefix_map(options, 
self.HEADER_PREFIX))
+
+        self.rest_auth_function = RESTAuthFunction(base_headers, auth_provider)
+        self.options = options
+        self.resource_paths = ResourcePaths.for_catalog_properties(options)
+
+    def __build_paged_query_params(max_results: Optional[int],
+                                   page_token: Optional[str],
+                                   name_patterns: Dict[str, str]) -> Dict[str, 
str]:
+        query_params = {}
+        if max_results is not None and max_results > 0:
+            query_params[RESTApi.MAX_RESULTS] = str(max_results)
+
+        if page_token is not None and page_token.strip():
+            query_params[RESTApi.PAGE_TOKEN] = page_token
+
+        for (key, value) in name_patterns:
+            if key and value and key.strip() and value.strip():
+                query_params[key] = value
+
+        return query_params
+
+    def __list_data_from_page_api(self, page_api: Callable[[Dict[str, str]], 
PagedResponse[T]]) -> List[T]:
+        results = []
+        query_params = {}
+        page_token = None
+
+        while True:
+            if page_token:
+                query_params[RESTApi.PAGE_TOKEN] = page_token
+            elif RESTApi.PAGE_TOKEN in query_params:
+                del query_params[RESTApi.PAGE_TOKEN]
+
+            response = page_api(query_params)
+
+            if response.data:
+                results.extend(response.data())
+
+            page_token = response.next_page_token
+
+            if not page_token or not response.data:
+                break
+
+        return results
+
+    def get_options(self) -> dict[str, str]:
+        return self.options
+
+    def list_databases(self) -> List[str]:
+        return self.__list_data_from_page_api(
+            lambda query_params: self.client.get_with_params(
+                self.resource_paths.databases(),
+                query_params,
+                ListDatabasesResponse,
+                self.rest_auth_function
+            )
+        )
+
+    def list_databases_paged(self,
+                             max_results: Optional[int] = None,
+                             page_token: Optional[str] = None,
+                             database_name_pattern: Optional[str] = None) -> 
PagedList[str]:
+
+        response = self.client.get_with_params(
+            self.resource_paths.databases(),
+            self.__build_paged_query_params(
+                max_results,
+                page_token,
+                {self.DATABASE_NAME_PATTERN: database_name_pattern}
+            ),
+            ListDatabasesResponse,
+            self.rest_auth_function
+        )
+
+        databases = response.data() or []
+        return PagedList(databases, response.get_next_page_token())
+
+    def create_database(self, name: str, properties: Dict[str, str]) -> None:
+        request = CreateDatabaseRequest(name, properties)
+        self.client.post(self.resource_paths.databases(), request, 
self.rest_auth_function)
+
+    def get_database(self, name: str) -> GetDatabaseResponse:
+        return self.client.get(
+            self.resource_paths.database(name),
+            GetDatabaseResponse,
+            self.rest_auth_function
+        )
+
+    def drop_database(self, name: str) -> None:
+        self.client.delete(self.resource_paths.database(name), 
self.rest_auth_function)
+
+    def list_tables(self, database_name: str) -> List[str]:
+        return self.__list_data_from_page_api(
+            lambda query_params: self.client.get_with_params(
+                self.resource_paths.tables(database_name),
+                query_params,
+                ListTablesResponse,
+                self.rest_auth_function
+            )
+        )
+
+    def list_tables_paged(self,
+                          database_name: str,
+                          max_results: Optional[int] = None,
+                          page_token: Optional[str] = None,
+                          table_name_pattern: Optional[str] = None) -> 
PagedList[str]:
+        response = self.client.get_with_params(
+            self.resource_paths.tables(database_name),
+            self.__build_paged_query_params(
+                max_results,
+                page_token,
+                {self.TABLE_NAME_PATTERN: table_name_pattern}
+            ),
+            ListTablesResponse,
+            self.rest_auth_function
+        )
+
+        tables = response.data() or []
+        return PagedList(tables, response.get_next_page_token())
+
+    def get_table(self, identifier: Identifier) -> GetTableResponse:
+        return self.client.get(
+            self.resource_paths.table(identifier.database_name, 
identifier.object_name),
+            GetTableResponse,
+            self.rest_auth_function
+        )
diff --git a/python/api/api_response.py b/python/api/api_response.py
new file mode 100644
index 0000000000..332741547e
--- /dev/null
+++ b/python/api/api_response.py
@@ -0,0 +1,301 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from abc import ABC, abstractmethod
+from typing import Dict, Optional, Any, Generic, TypeVar, List
+from dataclasses import dataclass, field
+import json
+from datetime import datetime
+from api_resquest import Identifier
+
+T = TypeVar('T')
+
+
+@dataclass
+class PagedList(Generic[T]):
+    elements: List[T]
+    next_page_token: Optional[str] = None
+
+
+class RESTResponse(ABC):
+    pass
+
+
+@dataclass
+class ErrorResponse(RESTResponse):
+    resource_type: Optional[str] = None
+    resource_name: Optional[str] = None
+    message: Optional[str] = None
+    code: Optional[int] = None
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'ErrorResponse':
+        return cls(
+            resource_type=data.get('resourceType'),
+            resource_name=data.get('resourceName'),
+            message=data.get('message'),
+            code=data.get('code'),
+        )
+
+    @classmethod
+    def from_json(cls, json_str: str) -> 'ErrorResponse':
+        data = json.loads(json_str)
+        return cls.from_dict(data)
+
+
+@dataclass
+class AuditRESTResponse(RESTResponse):
+    FIELD_OWNER = "owner"
+    FIELD_CREATED_AT = "createdAt"
+    FIELD_CREATED_BY = "createdBy"
+    FIELD_UPDATED_AT = "updatedAt"
+    FIELD_UPDATED_BY = "updatedBy"
+
+    owner: Optional[str] = None
+    created_at: Optional[int] = None
+    created_by: Optional[str] = None
+    updated_at: Optional[int] = None
+    updated_by: Optional[str] = None
+
+    def get_owner(self) -> Optional[str]:
+        return self.owner
+
+    def get_created_at(self) -> Optional[int]:
+        return self.created_at
+
+    def get_created_by(self) -> Optional[str]:
+        return self.created_by
+
+    def get_updated_at(self) -> Optional[int]:
+        return self.updated_at
+
+    def get_updated_by(self) -> Optional[str]:
+        return self.updated_by
+
+    def get_created_datetime(self) -> Optional[datetime]:
+        if self.created_at:
+            return datetime.fromtimestamp(self.created_at / 1000)
+        return None
+
+    def get_updated_datetime(self) -> Optional[datetime]:
+        if self.updated_at:
+            return datetime.fromtimestamp(self.updated_at / 1000)
+        return None
+
+
+class PagedResponse(RESTResponse, Generic[T]):
+
+    @abstractmethod
+    def data(self) -> List[T]:
+        pass
+
+    @abstractmethod
+    def get_next_page_token(self) -> str:
+        pass
+
+
+@dataclass
+class ListDatabasesResponse(PagedResponse[str]):
+    databases: List[str]
+    next_page_token: str
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'ListDatabasesResponse':
+        return cls(
+            databases=data.get('databases'),
+            next_page_token=data.get('nextPageToken')
+        )
+
+    @classmethod
+    def from_json(cls, json_str: str) -> 'ListDatabasesResponse':
+        data = json.loads(json_str)
+        return cls.from_dict(data)
+
+    def data(self) -> List[str]:
+        return self.databases
+
+    def get_next_page_token(self) -> str:
+        return self.next_page_token
+
+
+@dataclass
+class ListTablesResponse(PagedResponse[str]):
+    tables: Optional[List[str]]
+    next_page_token: Optional[str]
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'ListTablesResponse':
+        return cls(
+            tables=data.get('tables'),
+            next_page_token=data.get('nextPageToken')
+        )
+
+    @classmethod
+    def from_json(cls, json_str: str) -> 'ListTablesResponse':
+        data = json.loads(json_str)
+        return cls.from_dict(data)
+
+    def data(self) -> Optional[List[str]]:
+        return self.tables
+
+    def get_next_page_token(self) -> Optional[str]:
+        return self.next_page_token
+
+
+@dataclass
+class GetTableResponse(RESTResponse):
+    identifier: Identifier
+    schema: Any
+    properties: Dict[str, str]
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> Dict[str, Any]:
+        return data
+
+    @classmethod
+    def from_json(cls, json_str: str) -> Dict[str, Any]:
+        data = json.loads(json_str)
+        return cls.from_dict(data)
+
+
+@dataclass
+class GetDatabaseResponse(AuditRESTResponse):
+    FIELD_ID = "id"
+    FIELD_NAME = "name"
+    FIELD_LOCATION = "location"
+    FIELD_OPTIONS = "options"
+
+    id: Optional[str] = None
+    name: Optional[str] = None
+    location: Optional[str] = None
+    options: Optional[Dict[str, str]] = field(default_factory=dict)
+
+    def __init__(self,
+                 id: Optional[str] = None,
+                 name: Optional[str] = None,
+                 location: Optional[str] = None,
+                 options: Optional[Dict[str, str]] = None,
+                 owner: Optional[str] = None,
+                 created_at: Optional[int] = None,
+                 created_by: Optional[str] = None,
+                 updated_at: Optional[int] = None,
+                 updated_by: Optional[str] = None):
+        super().__init__(owner, created_at, created_by, updated_at, updated_by)
+        self.id = id
+        self.name = name
+        self.location = location
+        self.options = options or {}
+
+    def get_id(self) -> Optional[str]:
+        return self.id
+
+    def get_name(self) -> Optional[str]:
+        return self.name
+
+    def get_location(self) -> Optional[str]:
+        return self.location
+
+    def get_options(self) -> Dict[str, str]:
+        return self.options or {}
+
+    def to_dict(self) -> Dict[str, Any]:
+        result = {
+            self.FIELD_ID: self.id,
+            self.FIELD_NAME: self.name,
+            self.FIELD_LOCATION: self.location,
+            self.FIELD_OPTIONS: self.options
+        }
+
+        if self.owner is not None:
+            result[self.FIELD_OWNER] = self.owner
+        if self.created_at is not None:
+            result[self.FIELD_CREATED_AT] = self.created_at
+        if self.created_by is not None:
+            result[self.FIELD_CREATED_BY] = self.created_by
+        if self.updated_at is not None:
+            result[self.FIELD_UPDATED_AT] = self.updated_at
+        if self.updated_by is not None:
+            result[self.FIELD_UPDATED_BY] = self.updated_by
+
+        return result
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'GetDatabaseResponse':
+        return cls(
+            id=data.get(cls.FIELD_ID),
+            name=data.get(cls.FIELD_NAME),
+            location=data.get(cls.FIELD_LOCATION),
+            options=data.get(cls.FIELD_OPTIONS, {}),
+            owner=data.get(cls.FIELD_OWNER),
+            created_at=data.get(cls.FIELD_CREATED_AT),
+            created_by=data.get(cls.FIELD_CREATED_BY),
+            updated_at=data.get(cls.FIELD_UPDATED_AT),
+            updated_by=data.get(cls.FIELD_UPDATED_BY)
+        )
+
+    def to_json(self) -> str:
+        return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
+
+    @classmethod
+    def from_json(cls, json_str: str) -> 'GetDatabaseResponse':
+        data = json.loads(json_str)
+        return cls.from_dict(data)
+
+    def __str__(self) -> str:
+        return f"GetDatabaseResponse(id={self.id}, name={self.name}, 
location={self.location})"
+
+    def __repr__(self) -> str:
+        return (f"GetDatabaseResponse(id={self.id!r}, name={self.name!r}, "
+                f"location={self.location!r}, options={self.options!r}, "
+                f"owner={self.owner!r}, created_at={self.created_at}, "
+                f"created_by={self.created_by!r}, 
updated_at={self.updated_at}, "
+                f"updated_by={self.updated_by!r})")
+
+
+@dataclass
+class ConfigResponse(RESTResponse):
+    defaults: Dict[str, Any]
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> 'ConfigResponse':
+        return cls(
+            defaults=data.get('defaults')
+        )
+
+    @classmethod
+    def from_json(cls, json_str: str) -> 'ConfigResponse':
+        data = json.loads(json_str)
+        return cls.from_dict(data)
+
+    def merge(self, options: Dict[str, Any]) -> Dict[str, Any]:
+        merged = options.copy()
+        merged.update(self.defaults)
+        return merged
+
+
+class JSONSerializableGetDatabaseResponse(GetDatabaseResponse):
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
+    def __json__(self) -> Dict[str, Any]:
+        return self.to_dict()
+
+    @classmethod
+    def __from_json__(cls, data: Dict[str, Any]) -> 
'JSONSerializableGetDatabaseResponse':
+        return cls.from_dict(data)
diff --git a/python/api/api_resquest.py b/python/api/api_resquest.py
new file mode 100644
index 0000000000..d7fd882447
--- /dev/null
+++ b/python/api/api_resquest.py
@@ -0,0 +1,43 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from abc import ABC
+from dataclasses import dataclass
+from typing import Dict, List, Any, Optional
+
+
+class RESTRequest(ABC):
+    pass
+
+
+@dataclass
+class Identifier:
+    database_name: str
+    object_name: str
+
+
+@dataclass
+class CreateDatabaseRequest(RESTRequest):
+    name: str
+    properties: Dict[str, str]
+
+
+@dataclass
+class AlterDatabaseRequest(RESTRequest):
+    removals: List[str]
+    updates: Dict[str, str]
diff --git a/python/api/auth.py b/python/api/auth.py
new file mode 100644
index 0000000000..9e88651694
--- /dev/null
+++ b/python/api/auth.py
@@ -0,0 +1,284 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import base64
+import hashlib
+import hmac
+import logging
+import threading
+from abc import ABC, abstractmethod
+from collections import OrderedDict
+from dataclasses import dataclass
+from datetime import datetime, timezone
+from typing import Optional, Dict
+
+
+@dataclass
+class RESTAuthParameter:
+    method: str
+    path: str
+    data: str
+    parameters: Dict[str, str]
+
+
+@dataclass
+class DLFToken:
+    access_key_id: str
+    access_key_secret: str
+    security_token: Optional[str] = None
+
+    def __init__(self, options: Dict[str, str]):
+        from api import RESTCatalogOptions
+        self.access_key_id = options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID)
+        self.access_key_secret = 
options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET)
+
+
+class AuthProvider(ABC):
+
+    @abstractmethod
+    def merge_auth_header(self, base_header: Dict[str, str], parammeter: 
RESTAuthParameter) -> Dict[str, str]:
+        """Merge authorization header into header."""
+
+
+class RESTAuthFunction:
+
+    def __init__(self, init_header: Dict[str, str], auth_provider: 
AuthProvider):
+        self.init_header = init_header.copy() if init_header else {}
+        self.auth_provider = auth_provider
+
+    def __call__(self, rest_auth_parameter: RESTAuthParameter) -> Dict[str, 
str]:
+        return self.auth_provider.merge_auth_header(self.init_header, 
rest_auth_parameter)
+
+    def apply(self, rest_auth_parameter: RESTAuthParameter) -> Dict[str, str]:
+        return self.__call__(rest_auth_parameter)
+
+
+class DLFAuthProvider(AuthProvider):
+    DLF_AUTHORIZATION_HEADER_KEY = "Authorization"
+    DLF_CONTENT_MD5_HEADER_KEY = "Content-MD5"
+    DLF_CONTENT_TYPE_KEY = "Content-Type"
+    DLF_DATE_HEADER_KEY = "x-dlf-date"
+    DLF_SECURITY_TOKEN_HEADER_KEY = "x-dlf-security-token"
+    DLF_AUTH_VERSION_HEADER_KEY = "x-dlf-version"
+    DLF_CONTENT_SHA56_HEADER_KEY = "x-dlf-content-sha256"
+    DLF_CONTENT_SHA56_VALUE = "UNSIGNED-PAYLOAD"
+
+    AUTH_DATE_TIME_FORMAT = "%Y%m%dT%H%M%SZ"
+    MEDIA_TYPE = "application/json"
+
+    def __init__(self,
+                 token: DLFToken,
+                 region: str):
+        self.logger = logging.getLogger(self.__class__.__name__)
+        self.token = token
+        self.region = region
+        self._lock = threading.Lock()
+
+    def merge_auth_header(self,
+                          base_header: Dict[str, str],
+                          rest_auth_parameter: RESTAuthParameter) -> Dict[str, 
str]:
+        try:
+            date_time = base_header.get(
+                self.DLF_DATE_HEADER_KEY.lower(),
+                datetime.now(timezone.utc).strftime(self.AUTH_DATE_TIME_FORMAT)
+            )
+            date = date_time[:8]
+
+            sign_headers = self.generate_sign_headers(
+                rest_auth_parameter.data,
+                date_time,
+                self.token.security_token
+            )
+
+            authorization = DLFAuthSignature.get_authorization(
+                rest_auth_parameter=rest_auth_parameter,
+                dlf_token=self.token,
+                region=self.region,
+                headers=sign_headers,
+                date_time=date_time,
+                date=date
+            )
+
+            headers_with_auth = base_header.copy()
+            headers_with_auth.update(sign_headers)
+            headers_with_auth[self.DLF_AUTHORIZATION_HEADER_KEY] = 
authorization
+
+            return headers_with_auth
+
+        except Exception as e:
+            raise RuntimeError(f"Failed to merge auth header: {e}")
+
+    @classmethod
+    def generate_sign_headers(cls,
+                              data: Optional[str],
+                              date_time: str,
+                              security_token: Optional[str]) -> Dict[str, str]:
+        sign_headers = {}
+
+        sign_headers[cls.DLF_DATE_HEADER_KEY] = date_time
+        sign_headers[cls.DLF_CONTENT_SHA56_HEADER_KEY] = 
cls.DLF_CONTENT_SHA56_VALUE
+        sign_headers[cls.DLF_AUTH_VERSION_HEADER_KEY] = "v1"  # 
DLFAuthSignature.VERSION
+
+        if data is not None and data != "":
+            sign_headers[cls.DLF_CONTENT_TYPE_KEY] = cls.MEDIA_TYPE
+            sign_headers[cls.DLF_CONTENT_MD5_HEADER_KEY] = 
DLFAuthSignature.md5(data)
+
+        if security_token is not None:
+            sign_headers[cls.DLF_SECURITY_TOKEN_HEADER_KEY] = security_token
+        return sign_headers
+
+
+class DLFAuthSignature:
+    VERSION = "v1"
+    SIGNATURE_ALGORITHM = "DLF4-HMAC-SHA256"
+    PRODUCT = "DlfNext"
+    HMAC_SHA256 = "sha256"
+    REQUEST_TYPE = "aliyun_v4_request"
+    SIGNATURE_KEY = "Signature"
+    NEW_LINE = "\n"
+    SIGNED_HEADERS = [
+        DLFAuthProvider.DLF_CONTENT_MD5_HEADER_KEY.lower(),
+        DLFAuthProvider.DLF_CONTENT_TYPE_KEY.lower(),
+        DLFAuthProvider.DLF_CONTENT_SHA56_HEADER_KEY.lower(),
+        DLFAuthProvider.DLF_DATE_HEADER_KEY.lower(),
+        DLFAuthProvider.DLF_AUTH_VERSION_HEADER_KEY.lower(),
+        DLFAuthProvider.DLF_SECURITY_TOKEN_HEADER_KEY.lower()
+    ]
+
+    @classmethod
+    def get_authorization(cls,
+                          rest_auth_parameter: RESTAuthParameter,
+                          dlf_token: DLFToken,
+                          region: str,
+                          headers: Dict[str, str],
+                          date_time: str,
+                          date: str) -> str:
+        try:
+            canonical_request = cls.get_canonical_request(rest_auth_parameter, 
headers)
+
+            string_to_sign = cls.NEW_LINE.join([
+                cls.SIGNATURE_ALGORITHM,
+                date_time,
+                f"{date}/{region}/{cls.PRODUCT}/{cls.REQUEST_TYPE}",
+                cls._sha256_hex(canonical_request)
+            ])
+
+            date_key = 
cls._hmac_sha256(f"aliyun_v4{dlf_token.access_key_secret}".encode('utf-8'), 
date)
+            date_region_key = cls._hmac_sha256(date_key, region)
+            date_region_service_key = cls._hmac_sha256(date_region_key, 
cls.PRODUCT)
+            signing_key = cls._hmac_sha256(date_region_service_key, 
cls.REQUEST_TYPE)
+
+            result = cls._hmac_sha256(signing_key, string_to_sign)
+            signature = cls._hex_encode(result)
+
+            credential = f"{cls.SIGNATURE_ALGORITHM} 
Credential={dlf_token.access_key_id}/{date}/{region}/{cls.PRODUCT}/{cls.REQUEST_TYPE}"
+            signature_part = f"{cls.SIGNATURE_KEY}={signature}"
+
+            return f"{credential},{signature_part}"
+
+        except Exception as e:
+            raise RuntimeError(f"Failed to generate authorization: {e}")
+
+    @classmethod
+    def md5(cls, raw: str) -> str:
+        try:
+            md5_hash = hashlib.md5(raw.encode('utf-8')).digest()
+            return base64.b64encode(md5_hash).decode('utf-8')
+        except Exception as e:
+            raise RuntimeError(f"Failed to calculate MD5: {e}")
+
+    @classmethod
+    def _hmac_sha256(cls, key: bytes, data: str) -> bytes:
+        try:
+            return hmac.new(key, data.encode('utf-8'), hashlib.sha256).digest()
+        except Exception as e:
+            raise RuntimeError(f"Failed to calculate HMAC-SHA256: {e}")
+
+    @classmethod
+    def get_canonical_request(cls,
+                              rest_auth_parameter: RESTAuthParameter,
+                              headers: Dict[str, str]) -> str:
+        canonical_request = cls.NEW_LINE.join([
+            rest_auth_parameter.method,
+            rest_auth_parameter.path
+        ])
+
+        canonical_query_string = 
cls._build_canonical_query_string(rest_auth_parameter.parameters)
+        canonical_request = cls.NEW_LINE.join([canonical_request, 
canonical_query_string])
+
+        sorted_signed_headers_map = 
cls._build_sorted_signed_headers_map(headers)
+        for key, value in sorted_signed_headers_map.items():
+            canonical_request = cls.NEW_LINE.join([
+                canonical_request,
+                f"{key}:{value}"
+            ])
+
+        content_sha256 = headers.get(
+            DLFAuthProvider.DLF_CONTENT_SHA56_HEADER_KEY,
+            DLFAuthProvider.DLF_CONTENT_SHA56_VALUE
+        )
+
+        return cls.NEW_LINE.join([canonical_request, content_sha256])
+
+    @classmethod
+    def _build_canonical_query_string(cls, parameters: Optional[Dict[str, 
str]]) -> str:
+        if not parameters:
+            return ""
+
+        sorted_params = OrderedDict(sorted(parameters.items()))
+
+        query_parts = []
+        for key, value in sorted_params.items():
+            key = cls._trim(key)
+            if value is not None and value != "":
+                value = cls._trim(value)
+                query_parts.append(f"{key}={value}")
+            else:
+                query_parts.append(key)
+
+        return "&".join(query_parts)
+
+    @classmethod
+    def _build_sorted_signed_headers_map(cls, headers: Optional[Dict[str, 
str]]) -> OrderedDict:
+        sorted_headers = OrderedDict()
+
+        if headers:
+            for key, value in headers.items():
+                lower_key = key.lower()
+                if lower_key in cls.SIGNED_HEADERS:
+                    sorted_headers[lower_key] = cls._trim(value)
+
+        return OrderedDict(sorted(sorted_headers.items()))
+
+    @classmethod
+    def _sha256_hex(cls, raw: str) -> str:
+        try:
+            sha256_hash = hashlib.sha256(raw.encode('utf-8')).digest()
+            return cls._hex_encode(sha256_hash)
+        except Exception as e:
+            raise RuntimeError(f"Failed to calculate SHA256: {e}")
+
+    @classmethod
+    def _hex_encode(cls, raw: bytes) -> str:
+        if raw is None:
+            return None
+
+        return raw.hex()
+
+    @classmethod
+    def _trim(cls, value: str) -> str:
+        return value.strip() if value else ""
diff --git a/python/api/client.py b/python/api/client.py
new file mode 100644
index 0000000000..409b4d2c28
--- /dev/null
+++ b/python/api/client.py
@@ -0,0 +1,324 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import json
+import logging
+import urllib.parse
+from abc import ABC, abstractmethod
+from typing import Dict, Optional, Type, TypeVar, Callable
+
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3 import Retry
+
+from auth import RESTAuthParameter
+from api import RESTApi
+from api_response import ErrorResponse
+
+T = TypeVar('T', bound='RESTResponse')
+
+
+class RESTRequest(ABC):
+    pass
+
+
+class RESTException(Exception):
+
+    def __init__(self, message: str, cause: Optional[Exception] = None):
+        super().__init__(message)
+        self.cause = cause
+
+
+class ErrorHandler(ABC):
+
+    @abstractmethod
+    def accept(self, error: ErrorResponse, request_id: str) -> None:
+        pass
+
+
+class DefaultErrorHandler(ErrorHandler):
+
+    _instance = None
+
+    @classmethod
+    def get_instance(cls) -> 'DefaultErrorHandler':
+        if cls._instance is None:
+            cls._instance = cls()
+        return cls._instance
+
+    def accept(self, error: ErrorResponse, request_id: str) -> None:
+        message = f"REST API error (request_id: {request_id}): {error.message}"
+        if error.resource_name:
+            message += f" (resource: {error.resource_name})"
+        if error.resource_type:
+            message += f" (resource_type: {error.resource_type})"
+
+        raise RESTException(message)
+
+
+class ExponentialRetryInterceptor:
+
+    def __init__(self, max_retries: int = 5):
+        self.max_retries = max_retries
+        self.logger = logging.getLogger(self.__class__.__name__)
+
+    def create_retry_strategy(self) -> Retry:
+        return Retry(
+            total=self.max_retries,
+            status_forcelist=[429, 500, 502, 503, 504],
+            method_whitelist=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", 
"TRACE", "POST"],
+            backoff_factor=1,
+            raise_on_status=False
+        )
+
+
+class LoggingInterceptor:
+
+    REQUEST_ID_KEY = "x-request-id"
+    DEFAULT_REQUEST_ID = "unknown"
+
+    def __init__(self):
+        self.logger = logging.getLogger(self.__class__.__name__)
+
+    def log_request(self, method: str, url: str, headers: Dict[str, str]) -> 
None:
+        request_id = headers.get(self.REQUEST_ID_KEY, self.DEFAULT_REQUEST_ID)
+        self.logger.debug(f"Request [{request_id}]: {method} {url}")
+
+    def log_response(self, status_code: int, headers: Dict[str, str]) -> None:
+        request_id = headers.get(self.REQUEST_ID_KEY, self.DEFAULT_REQUEST_ID)
+        self.logger.debug(f"Response [{request_id}]: {status_code}")
+
+
+class RESTClient(ABC):
+
+    @abstractmethod
+    def get(self, path: str, response_type: Type[T],
+            rest_auth_function: Callable[[RESTAuthParameter], Dict[str, str]]) 
-> T:
+        pass
+
+    @abstractmethod
+    def get_with_params(self, path: str, query_params: Dict[str, str],
+                        response_type: Type[T],
+                        rest_auth_function: Callable[[RESTAuthParameter], 
Dict[str, str]]) -> T:
+        pass
+
+    @abstractmethod
+    def post(self, path: str, body: RESTRequest,
+             rest_auth_function: Callable[[RESTAuthParameter], Dict[str, 
str]]) -> T:
+        pass
+
+    @abstractmethod
+    def post_with_response_type(self, path: str, body: RESTRequest, 
response_type: Type[T],
+                                rest_auth_function: 
Callable[[RESTAuthParameter], Dict[str, str]]) -> T:
+        pass
+
+    @abstractmethod
+    def delete(self, path: str,
+               rest_auth_function: Callable[[RESTAuthParameter], Dict[str, 
str]]) -> T:
+        pass
+
+    @abstractmethod
+    def delete_with_body(self, path: str, body: RESTRequest,
+                         rest_auth_function: Callable[[RESTAuthParameter], 
Dict[str, str]]) -> T:
+        pass
+
+
+def _normalize_uri(uri: str) -> str:
+    if not uri or uri.strip() == "":
+        raise ValueError("uri is empty which must be defined.")
+
+    server_uri = uri.strip()
+
+    if server_uri.endswith("/"):
+        server_uri = server_uri[:-1]
+
+    if not server_uri.startswith("http://";) and not 
server_uri.startswith("https://";):
+        server_uri = f"http://{server_uri}";
+
+    return server_uri
+
+
+def _parse_error_response(response_body: Optional[str], status_code: int) -> 
ErrorResponse:
+    if response_body:
+        try:
+            return ErrorResponse.from_json(response_body)
+        except Exception:
+            return ErrorResponse(
+                resource_type=None,
+                resource_name=None,
+                message=response_body,
+                code=status_code
+            )
+    else:
+        return ErrorResponse(
+            resource_type=None,
+            resource_name=None,
+            message="response body is null",
+            code=status_code
+        )
+
+
+def _get_headers_with_params(path: str, query_params: Dict[str, str],
+                             method: str, data: str,
+                             header_function: Callable[[RESTAuthParameter], 
Dict[str, str]]) -> Dict[str, str]:
+    rest_auth_parameter = RESTAuthParameter(
+        path=path,
+        parameters=query_params,
+        method=method,
+        data=data
+    )
+    return header_function(rest_auth_parameter)
+
+
+def _get_headers(path: str, method: str, query_params: Dict[str, str], data: 
str,
+                 header_function: Callable[[RESTAuthParameter], Dict[str, 
str]]) -> Dict[str, str]:
+    return _get_headers_with_params(path, query_params, method, data, 
header_function)
+
+
+class HttpClient(RESTClient):
+
+    def __init__(self, uri: str):
+        self.logger = logging.getLogger(self.__class__.__name__)
+        self.uri = _normalize_uri(uri)
+        self.error_handler = DefaultErrorHandler.get_instance()
+        self.logging_interceptor = LoggingInterceptor()
+
+        self.session = requests.Session()
+
+        retry_interceptor = ExponentialRetryInterceptor(max_retries=5)
+        retry_strategy = retry_interceptor.create_retry_strategy()
+        adapter = HTTPAdapter(max_retries=retry_strategy)
+
+        self.session.mount("http://";, adapter)
+        self.session.mount("https://";, adapter)
+
+        self.session.timeout = (180, 180)
+
+        self.session.headers.update({
+            'Accept': 'application/json'
+        })
+
+    def set_error_handler(self, error_handler: ErrorHandler) -> None:
+        self.error_handler = error_handler
+
+    def get(self, path: str, response_type: Type[T],
+            rest_auth_function: Callable[[RESTAuthParameter], Dict[str, str]]) 
-> T:
+        auth_headers = _get_headers(path, "GET", {}, "", rest_auth_function)
+        url = self._get_request_url(path, None)
+
+        return self._execute_request("GET", url, headers=auth_headers,
+                                     response_type=response_type)
+
+    def get_with_params(self, path: str, query_params: Dict[str, str],
+                        response_type: Type[T],
+                        rest_auth_function: Callable[[RESTAuthParameter], 
Dict[str, str]]) -> T:
+        auth_headers = _get_headers(path, "GET", query_params, None, 
rest_auth_function)
+        url = self._get_request_url(path, query_params)
+
+        return self._execute_request("GET", url, headers=auth_headers,
+                                     response_type=response_type)
+
+    def post(self, path: str, body: RESTRequest,
+             rest_auth_function: Callable[[RESTAuthParameter], Dict[str, 
str]]) -> T:
+        return self.post_with_response_type(path, body, None, 
rest_auth_function)
+
+    def post_with_response_type(self, path: str, body: RESTRequest, 
response_type: Optional[Type[T]],
+                                rest_auth_function: 
Callable[[RESTAuthParameter], Dict[str, str]]) -> T:
+        try:
+            body_str = RESTApi.to_json(body)
+            auth_headers = _get_headers(path, "POST", body_str, 
rest_auth_function)
+            url = self._get_request_url(path, None)
+
+            return self._execute_request("POST", url, data=body_str, 
headers=auth_headers,
+                                         response_type=response_type)
+        except json.JSONEncodeError as e:
+            raise RESTException("build request failed.", e)
+
+    def delete(self, path: str,
+               rest_auth_function: Callable[[RESTAuthParameter], Dict[str, 
str]]) -> T:
+        auth_headers = _get_headers(path, "DELETE", "", rest_auth_function)
+        url = self._get_request_url(path, None)
+
+        return self._execute_request("DELETE", url, headers=auth_headers, 
response_type=None)
+
+    def delete_with_body(self, path: str, body: RESTRequest,
+                         rest_auth_function: Callable[[RESTAuthParameter], 
Dict[str, str]]) -> T:
+        try:
+            body_str = RESTApi.to_json(body)
+            auth_headers = _get_headers(path, "DELETE", body_str, 
rest_auth_function)
+            url = self._get_request_url(path, None)
+
+            return self._execute_request("DELETE", url, data=body_str, 
headers=auth_headers,
+                                         response_type=None)
+        except json.JSONEncodeError as e:
+            raise RESTException("build request failed.", e)
+
+    def _get_request_url(self, path: str, query_params: Optional[Dict[str, 
str]]) -> str:
+        if not path or path.strip() == "":
+            full_path = self.uri
+        else:
+            full_path = self.uri + path
+
+        if query_params:
+            query_string = urllib.parse.urlencode(query_params)
+            full_path = f"{full_path}?{query_string}"
+
+        return full_path
+
+    def get_uri(self) -> str:
+        return self.uri
+
+    def _execute_request(self, method: str, url: str,
+                         data: Optional[str] = None,
+                         headers: Optional[Dict[str, str]] = None,
+                         response_type: Optional[Type[T]] = None) -> T:
+        try:
+            if headers:
+                self.logging_interceptor.log_request(method, url, headers)
+
+            response = self.session.request(
+                method=method,
+                url=url,
+                data=data.encode('utf-8') if data else None,
+                headers=headers
+            )
+
+            response_headers = dict(response.headers)
+            self.logging_interceptor.log_response(response.status_code, 
response_headers)
+
+            response_body_str = response.text if response.text else None
+
+            if not response.ok:
+                error = _parse_error_response(response_body_str, 
response.status_code)
+                request_id = response.headers.get(
+                    LoggingInterceptor.REQUEST_ID_KEY,
+                    LoggingInterceptor.DEFAULT_REQUEST_ID
+                )
+                self.error_handler.accept(error, request_id)
+
+            if response_type is not None and response_body_str is not None:
+                return response_type.from_json(response_body_str)
+            elif response_type is None:
+                return None
+            else:
+                raise RESTException("response body is null.")
+
+        except RESTException:
+            raise
+        except Exception as e:
+            raise RESTException("rest exception", e)


Reply via email to