This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b9f188e36c [python] add test for data type (#5900)
b9f188e36c is described below
commit b9f188e36cdc3c2b3586bceab8b7b4241f012996
Author: jerry <[email protected]>
AuthorDate: Wed Jul 16 14:03:25 2025 +0800
[python] add test for data type (#5900)
---
pypaimon/api/__init__.py | 22 ++-
pypaimon/api/api_response.py | 289 ++++--------------------------------
pypaimon/api/api_resquest.py | 16 +-
pypaimon/api/auth.py | 1 +
pypaimon/api/client.py | 205 +++++++++++++++++++++++---
pypaimon/api/data_types.py | 341 +++++++++++++++++++++++++++++++++++++++++++
pypaimon/api/rest_json.py | 71 ++++++---
pypaimon/api/typedef.py | 4 +-
pypaimon/tests/api_test.py | 104 ++++++++++++-
9 files changed, 739 insertions(+), 314 deletions(-)
diff --git a/pypaimon/api/__init__.py b/pypaimon/api/__init__.py
index 573fd889b9..a82d9ebb10 100644
--- a/pypaimon/api/__init__.py
+++ b/pypaimon/api/__init__.py
@@ -18,11 +18,10 @@
import logging
from typing import Dict, List, Optional, Callable
from urllib.parse import unquote
-import api
from api.auth import RESTAuthFunction
from api.api_response import PagedList, GetTableResponse,
ListDatabasesResponse, ListTablesResponse, \
GetDatabaseResponse, ConfigResponse, PagedResponse
-from api.api_resquest import CreateDatabaseRequest
+from api.api_resquest import CreateDatabaseRequest, AlterDatabaseRequest
from api.typedef import Identifier
from api.client import HttpClient
from api.auth import DLFAuthProvider, DLFToken
@@ -36,6 +35,7 @@ class RESTCatalogOptions:
DLF_REGION = "dlf.region"
DLF_ACCESS_KEY_ID = "dlf.access-key-id"
DLF_ACCESS_KEY_SECRET = "dlf.access-key-secret"
+ DLF_ACCESS_SECURITY_TOKEN = "dlf.security-token"
PREFIX = 'prefix'
@@ -217,8 +217,8 @@ class RESTApi:
databases = response.data() or []
return PagedList(databases, response.get_next_page_token())
- def create_database(self, name: str, properties: Dict[str, str]) -> None:
- request = CreateDatabaseRequest(name, properties)
+ def create_database(self, name: str, options: Dict[str, str]) -> None:
+ request = CreateDatabaseRequest(name, options)
self.client.post(self.resource_paths.databases(), request,
self.rest_auth_function)
def get_database(self, name: str) -> GetDatabaseResponse:
@@ -231,6 +231,20 @@ class RESTApi:
def drop_database(self, name: str) -> None:
self.client.delete(self.resource_paths.database(name),
self.rest_auth_function)
+ def alter_database(self, name: str, removals: Optional[List[str]] = None,
+ updates: Optional[Dict[str, str]] = None):
+ if not name or not name.strip():
+ raise ValueError("Database name cannot be empty")
+ removals = removals or []
+ updates = updates or {}
+ request = AlterDatabaseRequest(removals, updates)
+
+ return self.client.post(
+ self.resource_paths.database(name),
+ request,
+ self.rest_auth_function
+ )
+
def list_tables(self, database_name: str) -> List[str]:
return self.__list_data_from_page_api(
lambda query_params: self.client.get_with_params(
diff --git a/pypaimon/api/api_response.py b/pypaimon/api/api_response.py
index 4f2361c6b8..6005e11810 100644
--- a/pypaimon/api/api_response.py
+++ b/pypaimon/api/api_response.py
@@ -19,7 +19,10 @@ limitations under the License.
from abc import ABC, abstractmethod
from typing import Dict, Optional, Any, Generic, List
from dataclasses import dataclass, field
+
+from api.rest_json import json_field
from api.typedef import T
+from api.data_types import DataField
@dataclass
@@ -34,15 +37,11 @@ class RESTResponse(ABC):
@dataclass
class ErrorResponse(RESTResponse):
- FIELD_RESOURCE_TYPE: "resourceType"
- FIELD_RESOURCE_NAME: "resourceName"
- FIELD_MESSAGE: "message"
- FIELD_CODE: "code"
- resource_type: Optional[str] = None
- resource_name: Optional[str] = None
- message: Optional[str] = None
- code: Optional[int] = None
+ resource_type: Optional[str] = json_field("resourceType", default=None)
+ resource_name: Optional[str] = json_field("resourceName", default=None)
+ message: Optional[str] = json_field("message", default=None)
+ code: Optional[int] = json_field("code", default=None)
def __init__(self,
resource_type: Optional[str] = None,
@@ -54,23 +53,6 @@ class ErrorResponse(RESTResponse):
self.message = message
self.code = code
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'ErrorResponse':
- return cls(
- resource_type=data.get(cls.FIELD_RESOURCE_TYPE),
- resource_name=data.get(cls.FIELD_RESOURCE_NAME),
- message=data.get(cls.FIELD_MESSAGE),
- code=data.get(cls.FIELD_CODE),
- )
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- self.FIELD_RESOURCE_TYPE: self.resource_type,
- self.FIELD_RESOURCE_NAME: self.resource_name,
- self.FIELD_MESSAGE: self.message,
- self.FIELD_CODE: self.code
- }
-
@dataclass
class AuditRESTResponse(RESTResponse):
@@ -80,11 +62,11 @@ class AuditRESTResponse(RESTResponse):
FIELD_UPDATED_AT = "updatedAt"
FIELD_UPDATED_BY = "updatedBy"
- owner: Optional[str] = None
- created_at: Optional[int] = None
- created_by: Optional[str] = None
- updated_at: Optional[int] = None
- updated_by: Optional[str] = None
+ owner: Optional[str] = json_field(FIELD_OWNER, default=None)
+ created_at: Optional[int] = json_field(FIELD_CREATED_AT, default=None)
+ created_by: Optional[str] = json_field(FIELD_CREATED_BY, default=None)
+ updated_at: Optional[int] = json_field(FIELD_UPDATED_AT, default=None)
+ updated_by: Optional[str] = json_field(FIELD_UPDATED_BY, default=None)
def get_owner(self) -> Optional[str]:
return self.owner
@@ -118,21 +100,8 @@ class PagedResponse(RESTResponse, Generic[T]):
class ListDatabasesResponse(PagedResponse[str]):
FIELD_DATABASES = "databases"
- databases: List[str]
- next_page_token: str
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'ListDatabasesResponse':
- return cls(
- databases=data.get(cls.FIELD_DATABASES),
- next_page_token=data.get(cls.FIELD_NEXT_PAGE_TOKEN)
- )
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- self.FIELD_DATABASES: self.databases,
- self.FIELD_NEXT_PAGE_TOKEN: self.next_page_token
- }
+ databases: List[str] = json_field(FIELD_DATABASES)
+ next_page_token: str = json_field(PagedResponse.FIELD_NEXT_PAGE_TOKEN)
def data(self) -> List[str]:
return self.databases
@@ -145,21 +114,8 @@ class ListDatabasesResponse(PagedResponse[str]):
class ListTablesResponse(PagedResponse[str]):
FIELD_TABLES = "tables"
- tables: Optional[List[str]]
- next_page_token: Optional[str]
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'ListTablesResponse':
- return cls(
- tables=data.get(cls.FIELD_TABLES),
- next_page_token=data.get(cls.FIELD_NEXT_PAGE_TOKEN)
- )
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- self.FIELD_TABLES: self.tables,
- self.FIELD_NEXT_PAGE_TOKEN: self.next_page_token
- }
+ tables: Optional[List[str]] = json_field(FIELD_TABLES)
+ next_page_token: Optional[str] =
json_field(PagedResponse.FIELD_NEXT_PAGE_TOKEN)
def data(self) -> Optional[List[str]]:
return self.tables
@@ -168,88 +124,6 @@ class ListTablesResponse(PagedResponse[str]):
return self.next_page_token
-@dataclass
-class PaimonDataType:
- FIELD_TYPE = "type"
- FIELD_ELEMENT = "element"
- FIELD_FIELDS = "fields"
- FIELD_KEY = "key"
- FIELD_VALUE = "value"
-
- type: str
- element: Optional['PaimonDataType'] = None
- fields: List['DataField'] = field(default_factory=list)
- key: Optional['PaimonDataType'] = None
- value: Optional['PaimonDataType'] = None
-
- @classmethod
- def from_dict(cls, data: Any) -> 'PaimonDataType':
- if isinstance(data, dict):
- element = data.get(cls.FIELD_ELEMENT, None)
- fields = data.get(cls.FIELD_FIELDS, None)
- key = data.get(cls.FIELD_KEY, None)
- value = data.get(cls.FIELD_VALUE, None)
- if element is not None:
- element =
PaimonDataType.from_dict(data.get(cls.FIELD_ELEMENT)),
- if fields is not None:
- fields = list(map(lambda f: DataField.from_dict(f), fields)),
- if key is not None:
- key = PaimonDataType.from_dict(key)
- if value is not None:
- value = PaimonDataType.from_dict(value)
- return cls(
- type=data.get(cls.FIELD_TYPE),
- element=element,
- fields=fields,
- key=key,
- value=value,
- )
- else:
- return cls(type=data)
-
- def to_dict(self) -> Any:
- if self.element is None and self.fields is None and self.key:
- return self.type
- if self.element is not None:
- return {self.FIELD_TYPE: self.type, self.FIELD_ELEMENT:
self.element}
- elif self.fields is not None:
- return {self.FIELD_TYPE: self.type, self.FIELD_FIELDS: self.fields}
- elif self.value is not None:
- return {self.FIELD_TYPE: self.type, self.FIELD_KEY: self.key,
self.FIELD_VALUE: self.value}
- elif self.key is not None and self.value is None:
- return {self.FIELD_TYPE: self.type, self.FIELD_KEY: self.key}
-
-
-@dataclass
-class DataField:
- FIELD_ID = "id"
- FIELD_NAME = "name"
- FIELD_TYPE = "type"
- FIELD_DESCRIPTION = "description"
-
- description: str
- id: int
- name: str
- type: PaimonDataType
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'DataField':
- return cls(
- id=data.get(cls.FIELD_ID),
- name=data.get(cls.FIELD_NAME),
- type=PaimonDataType.from_dict(data.get(cls.FIELD_TYPE)),
- description=data.get(cls.FIELD_DESCRIPTION),
- )
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- self.FIELD_ID: self.id,
- self.FIELD_NAME: self.name,
- self.FIELD_TYPE: PaimonDataType.to_dict(self.type),
- self.FIELD_DESCRIPTION: self.description
- }
-
-
@dataclass
class Schema:
FIELD_FIELDS = "fields"
@@ -258,30 +132,11 @@ class Schema:
FIELD_OPTIONS = "options"
FIELD_COMMENT = "comment"
- fields: List[DataField] = field(default_factory=list)
- partition_keys: List[str] = field(default_factory=list)
- primary_keys: List[str] = field(default_factory=list)
- options: Dict[str, str] = field(default_factory=dict)
- comment: Optional[str] = None
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'Schema':
- return cls(
- fields=list(map(lambda f: DataField.from_dict(f),
data.get(cls.FIELD_FIELDS))),
- partition_keys=data.get(cls.FIELD_PARTITION_KEYS),
- primary_keys=data.get(cls.FIELD_PRIMARY_KEYS),
- options=data.get(cls.FIELD_OPTIONS),
- comment=data.get(cls.FIELD_COMMENT)
- )
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- self.FIELD_FIELDS: list(map(lambda f: DataField.to_dict(f),
self.fields)),
- self.FIELD_PARTITION_KEYS: self.partition_keys,
- self.FIELD_PRIMARY_KEYS: self.primary_keys,
- self.FIELD_OPTIONS: self.options,
- self.FIELD_COMMENT: self.comment
- }
+ fields: List[DataField] = json_field(FIELD_FIELDS, default_factory=list)
+ partition_keys: List[str] = json_field(FIELD_PARTITION_KEYS,
default_factory=list)
+ primary_keys: List[str] = json_field(FIELD_PRIMARY_KEYS,
default_factory=list)
+ options: Dict[str, str] = json_field(FIELD_OPTIONS, default_factory=dict)
+ comment: Optional[str] = json_field(FIELD_COMMENT, default=None)
@dataclass
@@ -332,12 +187,12 @@ class GetTableResponse(AuditRESTResponse):
FIELD_SCHEMA_ID = "schemaId"
FIELD_SCHEMA = "schema"
- id: Optional[str] = None
- name: Optional[str] = None
- path: Optional[str] = None
- is_external: Optional[bool] = None
- schema_id: Optional[int] = None
- schema: Optional[Schema] = None
+ id: Optional[str] = json_field(FIELD_ID, default=None)
+ name: Optional[str] = json_field(FIELD_NAME, default=None)
+ path: Optional[str] = json_field(FIELD_PATH, default=None)
+ is_external: Optional[bool] = json_field(FIELD_IS_EXTERNAL, default=None)
+ schema_id: Optional[int] = json_field(FIELD_SCHEMA_ID, default=None)
+ schema: Optional[Schema] = json_field(FIELD_SCHEMA, default=None)
def __init__(self,
id: str,
@@ -359,44 +214,6 @@ class GetTableResponse(AuditRESTResponse):
self.schema_id = schema_id
self.schema = schema
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'GetTableResponse':
- return cls(
- id=data.get(cls.FIELD_ID),
- name=data.get(cls.FIELD_NAME),
- path=data.get(cls.FIELD_PATH),
- is_external=data.get(cls.FIELD_IS_EXTERNAL),
- schema_id=data.get(cls.FIELD_SCHEMA_ID),
- schema=Schema.from_dict(data.get(cls.FIELD_SCHEMA)),
- owner=data.get(cls.FIELD_OWNER),
- created_at=data.get(cls.FIELD_CREATED_AT),
- created_by=data.get(cls.FIELD_CREATED_BY),
- updated_at=data.get(cls.FIELD_UPDATED_AT),
- updated_by=data.get(cls.FIELD_UPDATED_BY)
- )
-
- def to_dict(self) -> Dict[str, Any]:
- result = {
- self.FIELD_ID: self.id,
- self.FIELD_NAME: self.name,
- self.FIELD_PATH: self.path,
- self.FIELD_IS_EXTERNAL: self.is_external,
- self.FIELD_SCHEMA_ID: self.schema_id,
- self.FIELD_SCHEMA: Schema.to_dict(self.schema)
- }
- if self.owner is not None:
- result[self.FIELD_OWNER] = self.owner
- if self.created_at is not None:
- result[self.FIELD_CREATED_AT] = self.created_at
- if self.created_by is not None:
- result[self.FIELD_CREATED_BY] = self.created_by
- if self.updated_at is not None:
- result[self.FIELD_UPDATED_AT] = self.updated_at
- if self.updated_by is not None:
- result[self.FIELD_UPDATED_BY] = self.updated_by
-
- return result
-
@dataclass
class GetDatabaseResponse(AuditRESTResponse):
@@ -405,10 +222,10 @@ class GetDatabaseResponse(AuditRESTResponse):
FIELD_LOCATION = "location"
FIELD_OPTIONS = "options"
- id: Optional[str] = None
- name: Optional[str] = None
- location: Optional[str] = None
- options: Optional[Dict[str, str]] = field(default_factory=dict)
+ id: Optional[str] = json_field(FIELD_ID, default=None)
+ name: Optional[str] = json_field(FIELD_NAME, default=None)
+ location: Optional[str] = json_field(FIELD_LOCATION, default=None)
+ options: Optional[Dict[str, str]] = json_field(FIELD_OPTIONS,
default_factory=dict)
def __init__(self,
id: Optional[str] = None,
@@ -438,54 +255,12 @@ class GetDatabaseResponse(AuditRESTResponse):
def get_options(self) -> Dict[str, str]:
return self.options or {}
- def to_dict(self) -> Dict[str, Any]:
- result = {
- self.FIELD_ID: self.id,
- self.FIELD_NAME: self.name,
- self.FIELD_LOCATION: self.location,
- self.FIELD_OPTIONS: self.options
- }
-
- if self.owner is not None:
- result[self.FIELD_OWNER] = self.owner
- if self.created_at is not None:
- result[self.FIELD_CREATED_AT] = self.created_at
- if self.created_by is not None:
- result[self.FIELD_CREATED_BY] = self.created_by
- if self.updated_at is not None:
- result[self.FIELD_UPDATED_AT] = self.updated_at
- if self.updated_by is not None:
- result[self.FIELD_UPDATED_BY] = self.updated_by
-
- return result
-
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'GetDatabaseResponse':
- return cls(
- id=data.get(cls.FIELD_ID),
- name=data.get(cls.FIELD_NAME),
- location=data.get(cls.FIELD_LOCATION),
- options=data.get(cls.FIELD_OPTIONS, {}),
- owner=data.get(cls.FIELD_OWNER),
- created_at=data.get(cls.FIELD_CREATED_AT),
- created_by=data.get(cls.FIELD_CREATED_BY),
- updated_at=data.get(cls.FIELD_UPDATED_AT),
- updated_by=data.get(cls.FIELD_UPDATED_BY)
- )
-
@dataclass
class ConfigResponse(RESTResponse):
FILED_DEFAULTS = "defaults"
- defaults: Dict[str, str]
-
- @classmethod
- def from_dict(cls, data: Dict[str, str]) -> 'ConfigResponse':
- return cls(defaults=data.get(cls.FILED_DEFAULTS))
-
- def to_dict(self) -> Dict[str, Any]:
- return {self.FILED_DEFAULTS: self.defaults}
+ defaults: Dict[str, str] = json_field(FILED_DEFAULTS)
def merge(self, options: Dict[str, str]) -> Dict[str, str]:
merged = options.copy()
diff --git a/pypaimon/api/api_resquest.py b/pypaimon/api/api_resquest.py
index 2d0d2583a8..1434176849 100644
--- a/pypaimon/api/api_resquest.py
+++ b/pypaimon/api/api_resquest.py
@@ -20,6 +20,8 @@ from abc import ABC
from dataclasses import dataclass
from typing import Dict, List
+from api.rest_json import json_field
+
class RESTRequest(ABC):
pass
@@ -27,11 +29,17 @@ class RESTRequest(ABC):
@dataclass
class CreateDatabaseRequest(RESTRequest):
- name: str
- properties: Dict[str, str]
+ FIELD_NAME = "name"
+ FIELD_OPTIONS = "options"
+
+ name: str = json_field(FIELD_NAME)
+ options: Dict[str, str] = json_field(FIELD_OPTIONS)
@dataclass
class AlterDatabaseRequest(RESTRequest):
- removals: List[str]
- updates: Dict[str, str]
+ FIELD_REMOVALS = "removals"
+ FIELD_UPDATES = "updates"
+
+ removals: List[str] = json_field(FIELD_REMOVALS)
+ updates: Dict[str, str] = json_field(FIELD_UPDATES)
diff --git a/pypaimon/api/auth.py b/pypaimon/api/auth.py
index 9e88651694..aabefe2d49 100644
--- a/pypaimon/api/auth.py
+++ b/pypaimon/api/auth.py
@@ -45,6 +45,7 @@ class DLFToken:
from api import RESTCatalogOptions
self.access_key_id = options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID)
self.access_key_secret =
options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET)
+ self.security_token =
options.get(RESTCatalogOptions.DLF_ACCESS_SECURITY_TOKEN)
class AuthProvider(ABC):
diff --git a/pypaimon/api/client.py b/pypaimon/api/client.py
index fc41e2672d..8c4b5b85e0 100644
--- a/pypaimon/api/client.py
+++ b/pypaimon/api/client.py
@@ -18,9 +18,10 @@ limitations under the License.
import json
import logging
+import traceback
import urllib.parse
from abc import ABC, abstractmethod
-from typing import Dict, Optional, Type, TypeVar, Callable
+from typing import Dict, Optional, Type, TypeVar, Callable, Any
import requests
from requests.adapters import HTTPAdapter
@@ -38,10 +39,100 @@ class RESTRequest(ABC):
class RESTException(Exception):
+ def __init__(self, message: str = None, *args: Any, cause:
Optional[Exception] = None):
+ if message and args:
+ try:
+ formatted_message = message % args
+ except (TypeError, ValueError):
+ formatted_message = f"{message} {' '.join(str(arg) for arg in
args)}"
+ else:
+ formatted_message = message or "REST API error occurred"
+
+ super().__init__(formatted_message)
+ self.__cause__ = cause
+
+ def get_cause(self) -> Optional[Exception]:
+ return self.__cause__
+
+ def get_message(self) -> str:
+ return str(self)
+
+ def print_stack_trace(self) -> None:
+ traceback.print_exception(type(self), self, self.__traceback__)
+
+ def get_stack_trace(self) -> str:
+ return ''.join(traceback.format_exception(type(self), self,
self.__traceback__))
+
+ def __repr__(self) -> str:
+ if self.__cause__:
+ return f"{self.__class__.__name__}('{self}', caused by
{type(self.__cause__).__name__}: {self.__cause__})"
+ return f"{self.__class__.__name__}('{self}')"
+
+
+class BadRequestException(RESTException):
+
+ def __init__(self, message: str = None, *args: Any):
+ super().__init__(message, *args)
+
+
+class BadRequestException(RESTException):
+ """Exception for 400 Bad Request"""
+ pass
+
+
+class NotAuthorizedException(RESTException):
+ """Exception for not authorized (401)"""
+
+ def __init__(self, message: str, *args: Any):
+ super().__init__(message, *args)
+
+
+class ForbiddenException(RESTException):
+ """Exception for forbidden access (403)"""
+
+ def __init__(self, message: str, *args: Any):
+ super().__init__(message, *args)
+
+
+class NoSuchResourceException(RESTException):
+ """Exception for resource not found (404)"""
+
+ def __init__(self, resource_type: Optional[str], resource_name:
Optional[str],
+ message: str, *args: Any):
+ self.resource_type = resource_type
+ self.resource_name = resource_name
+ super().__init__(message, *args)
+
+
+class AlreadyExistsException(RESTException):
+ """Exception for resource already exists (409)"""
+
+ def __init__(self, resource_type: Optional[str], resource_name:
Optional[str],
+ message: str, *args: Any):
+ self.resource_type = resource_type
+ self.resource_name = resource_name
+ super().__init__(message, *args)
+
+
+class ServiceFailureException(RESTException):
+ """Exception for service failure (500)"""
+
+ def __init__(self, message: str, *args: Any):
+ super().__init__(message, *args)
+
+
+class NotImplementedException(RESTException):
+ """Exception for not implemented (501)"""
- def __init__(self, message: str, cause: Optional[Exception] = None):
- super().__init__(message)
- self.cause = cause
+ def __init__(self, message: str, *args: Any):
+ super().__init__(message, *args)
+
+
+class ServiceUnavailableException(RESTException):
+ """Exception for service unavailable (503)"""
+
+ def __init__(self, message: str, *args: Any):
+ super().__init__(message, *args)
class ErrorHandler(ABC):
@@ -51,24 +142,95 @@ class ErrorHandler(ABC):
pass
+# DefaultErrorHandler implementation
class DefaultErrorHandler(ErrorHandler):
+ """
+ Default error handler that converts error responses to appropriate
exceptions.
+
+ This class implements the singleton pattern and handles various HTTP error
codes
+ by throwing corresponding exception types.
+ """
- _instance = None
+ _instance: Optional['DefaultErrorHandler'] = None
+
+ def __new__(cls) -> 'DefaultErrorHandler':
+ """Implement singleton pattern"""
+ if cls._instance is None:
+ cls._instance = super().__new__(cls)
+ return cls._instance
@classmethod
def get_instance(cls) -> 'DefaultErrorHandler':
+ """Get the singleton instance of DefaultErrorHandler"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def accept(self, error: ErrorResponse, request_id: str) -> None:
- message = f"REST API error (request_id: {request_id}): {error.message}"
- if error.resource_name:
- message += f" (resource: {error.resource_name})"
- if error.resource_type:
- message += f" (resource_type: {error.resource_type})"
+ """
+ Handle an error response by throwing appropriate exception.
+
+ Args:
+ error: The error response to handle
+ request_id: The request ID associated with the error
- raise RESTException(message)
+ Raises:
+ Appropriate exception based on error code
+ """
+ code = error.code
+
+ # Format message with request ID if not default
+ if LoggingInterceptor.DEFAULT_REQUEST_ID == request_id:
+ message = error.message
+ else:
+ # If we have a requestId, append it to the message
+ message = f"{error.message} requestId:{request_id}"
+
+ # Handle different error codes
+ if code == 400:
+ raise BadRequestException("%s", message)
+
+ elif code == 401:
+ raise NotAuthorizedException("Not authorized: %s", message)
+
+ elif code == 403:
+ raise ForbiddenException("Forbidden: %s", message)
+
+ elif code == 404:
+ raise NoSuchResourceException(
+ error.resource_type,
+ error.resource_name,
+ "%s",
+ message
+ )
+
+ elif code in [405, 406]:
+ # These codes are handled but don't throw exceptions
+ pass
+
+ elif code == 409:
+ raise AlreadyExistsException(
+ error.resource_type,
+ error.resource_name,
+ "%s",
+ message
+ )
+
+ elif code == 500:
+ raise ServiceFailureException("Server error: %s", message)
+
+ elif code == 501:
+ raise NotImplementedException(message)
+
+ elif code == 503:
+ raise ServiceUnavailableException("Service unavailable: %s",
message)
+
+ else:
+ # Default case for unhandled codes
+ pass
+
+ # If no specific exception was thrown, throw generic RESTException
+ raise RESTException("Unable to process: %s", message)
class ExponentialRetryInterceptor:
@@ -97,7 +259,6 @@ class ExponentialRetryInterceptor:
class LoggingInterceptor:
-
REQUEST_ID_KEY = "x-request-id"
DEFAULT_REQUEST_ID = "unknown"
@@ -250,17 +411,17 @@ class HttpClient(RESTClient):
rest_auth_function:
Callable[[RESTAuthParameter], Dict[str, str]]) -> T:
try:
body_str = JSON.to_json(body)
- auth_headers = _get_headers(path, "POST", body_str,
rest_auth_function)
+ auth_headers = _get_headers(path, "POST", None, body_str,
rest_auth_function)
url = self._get_request_url(path, None)
-
- return self._execute_request("POST", url, data=body_str,
headers=auth_headers,
- response_type=response_type)
- except json.JSONEncodeError as e:
- raise RESTException("build request failed.", e)
+ return self._execute_request("POST", url, data=body_str,
headers=auth_headers, response_type=response_type)
+ except RESTException as e:
+ raise e
+ except Exception as e:
+ raise RESTException("build request failed.", cause=e)
def delete(self, path: str,
rest_auth_function: Callable[[RESTAuthParameter], Dict[str,
str]]) -> T:
- auth_headers = _get_headers(path, "DELETE", "", rest_auth_function)
+ auth_headers = _get_headers(path, "DELETE", None, "",
rest_auth_function)
url = self._get_request_url(path, None)
return self._execute_request("DELETE", url, headers=auth_headers,
response_type=None)
@@ -327,7 +488,7 @@ class HttpClient(RESTClient):
else:
raise RESTException("response body is null.")
- except RESTException:
- raise
+ except RESTException as e:
+ raise e
except Exception as e:
- raise RESTException("rest exception", e)
+ raise RESTException("rest exception", cause=e)
diff --git a/pypaimon/api/data_types.py b/pypaimon/api/data_types.py
new file mode 100644
index 0000000000..9a8080fdab
--- /dev/null
+++ b/pypaimon/api/data_types.py
@@ -0,0 +1,341 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import threading
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from enum import Enum
+from typing import Dict, Any, Optional, List, Union
+
+
+class AtomicInteger:
+
+ def __init__(self, initial_value: int = 0):
+ self._value = initial_value
+ self._lock = threading.RLock()
+
+ def get(self) -> int:
+ with self._lock:
+ return self._value
+
+ def increment_and_get(self) -> int:
+ with self._lock:
+ self._value += 1
+ return self._value
+
+ def get_and_increment(self) -> int:
+ with self._lock:
+ old_value = self._value
+ self._value += 1
+ return old_value
+
+ def set(self, value: int):
+ with self._lock:
+ self._value = value
+
+
+class DataType(ABC):
+ def __init__(self, nullable: bool = True):
+ self.nullable = nullable
+
+ @abstractmethod
+ def to_dict(self) -> Dict[str, Any]:
+ pass
+
+ @abstractmethod
+ def __str__(self) -> str:
+ pass
+
+
+@dataclass
+class AtomicType(DataType):
+ type: str
+
+ def __init__(self, type: str, nullable: bool = True):
+ super().__init__(nullable)
+ self.type = type
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ 'type': self.type,
+ 'nullable': self.nullable
+ }
+
+ def __str__(self) -> str:
+ null_suffix = '' if self.nullable else ' NOT NULL'
+ return f"{self.type}{null_suffix}"
+
+
+@dataclass
+class ArrayType(DataType):
+ element: DataType
+
+ def __init__(self, nullable: bool, element_type: DataType):
+ super().__init__(nullable)
+ self.element = element_type
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ 'type': f"ARRAY{'<' + str(self.element) + '>' if self.element else
''}",
+ 'element': self.element.to_dict() if self.element else None,
+ 'nullable': self.nullable
+ }
+
+ def __str__(self) -> str:
+ null_suffix = '' if self.nullable else ' NOT NULL'
+ return f"ARRAY<{self.element}>{null_suffix}"
+
+
+@dataclass
+class MultisetType(DataType):
+ element: DataType
+
+ def __init__(self, nullable: bool, element_type: DataType):
+ super().__init__(nullable)
+ self.element = element_type
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ 'type': f"MULTISET{'<' + str(self.element) + '>' if self.element
else ''}",
+ 'element': self.element.to_dict() if self.element else None,
+ 'nullable': self.nullable
+ }
+
+ def __str__(self) -> str:
+ null_suffix = '' if self.nullable else ' NOT NULL'
+ return f"MULTISET<{self.element}>{null_suffix}"
+
+
+@dataclass
+class MapType(DataType):
+ key: DataType
+ value: DataType
+
+ def __init__(self, nullable: bool, key_type: DataType, value_type:
DataType):
+ super().__init__(nullable)
+ self.key = key_type
+ self.value = value_type
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ 'type': f"MAP<{self.key}, {self.value}>",
+ 'key': self.key.to_dict() if self.key else None,
+ 'value': self.value.to_dict() if self.value else None,
+ 'nullable': self.nullable
+ }
+
+ def __str__(self) -> str:
+ null_suffix = '' if self.nullable else ' NOT NULL'
+ return f"MAP<{self.key}, {self.value}>{null_suffix}"
+
+
+@dataclass
+class DataField:
+ FIELD_ID = "id"
+ FIELD_NAME = "name"
+ FIELD_TYPE = "type"
+ FIELD_DESCRIPTION = "description"
+ FIELD_DEFAULT_VALUE = "defaultValue"
+
+ id: int
+ name: str
+ type: DataType
+ description: Optional[str] = None
+ default_value: Optional[str] = None
+
+ def __init__(self, id: int, name: str, type: DataType, description:
Optional[str] = None,
+ default_value: Optional[str] = None):
+ self.id = id
+ self.name = name
+ self.type = type
+ self.description = description
+ self.default_value = default_value
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'DataField':
+ return DataTypeParser.parse_data_field(data)
+
+ def to_dict(self) -> Dict[str, Any]:
+ result = {
+ self.FIELD_ID: self.id,
+ self.FIELD_NAME: self.name,
+ self.FIELD_TYPE: self.type.to_dict() if self.type else None
+ }
+
+ if self.description is not None:
+ result[self.FIELD_DESCRIPTION] = self.description
+
+ if self.default_value is not None:
+ result[self.FIELD_DEFAULT_VALUE] = self.default_value
+
+ return result
+
+
+@dataclass
+class RowType(DataType):
+ fields: List[DataField]
+
+ def __init__(self, nullable: bool, fields: List[DataField]):
+ super().__init__(nullable)
+ self.fields = fields or []
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ 'type': 'ROW' + ('' if self.nullable else ' NOT NULL'),
+ 'fields': [field.to_dict() for field in self.fields],
+ 'nullable': self.nullable
+ }
+
+ def __str__(self) -> str:
+ field_strs = [f"{field.name}: {field.type}" for field in self.fields]
+ null_suffix = '' if self.nullable else ' NOT NULL'
+ return f"ROW<{', '.join(field_strs)}>{null_suffix}"
+
+
+class Keyword(Enum):
+ CHAR = "CHAR"
+ VARCHAR = "VARCHAR"
+ STRING = "STRING"
+ BOOLEAN = "BOOLEAN"
+ BINARY = "BINARY"
+ VARBINARY = "VARBINARY"
+ BYTES = "BYTES"
+ DECIMAL = "DECIMAL"
+ NUMERIC = "NUMERIC"
+ DEC = "DEC"
+ TINYINT = "TINYINT"
+ SMALLINT = "SMALLINT"
+ INT = "INT"
+ INTEGER = "INTEGER"
+ BIGINT = "BIGINT"
+ FLOAT = "FLOAT"
+ DOUBLE = "DOUBLE"
+ DATE = "DATE"
+ TIME = "TIME"
+ TIMESTAMP = "TIMESTAMP"
+ TIMESTAMP_LTZ = "TIMESTAMP_LTZ"
+ VARIANT = "VARIANT"
+
+
+class DataTypeParser:
+
+ @staticmethod
+ def parse_nullability(type_string: str) -> bool:
+ if ('NOT NULL' in type_string):
+ return False
+ elif ('NULL' in type_string):
+ return True
+ return True
+
+ @staticmethod
+ def parse_atomic_type_sql_string(type_string: str) -> DataType:
+ type_upper = type_string.upper().strip()
+
+ if '(' in type_upper:
+ base_type = type_upper.split('(')[0]
+ else:
+ base_type = type_upper
+
+ try:
+ Keyword(base_type)
+ return AtomicType(type_string,
DataTypeParser.parse_nullability(type_string))
+ except ValueError:
+ raise Exception(f"Unknown type: {base_type}")
+
+ @staticmethod
+ def parse_data_type(json_data: Union[Dict[str, Any], str], field_id:
Optional[AtomicInteger] = None) -> DataType:
+
+ if isinstance(json_data, str):
+ return DataTypeParser.parse_atomic_type_sql_string(json_data)
+
+ if isinstance(json_data, dict):
+ if 'type' not in json_data:
+ raise ValueError(f"Missing 'type' field in JSON: {json_data}")
+
+ type_string = json_data['type']
+
+ if type_string.startswith("ARRAY"):
+ element =
DataTypeParser.parse_data_type(json_data.get('element'), field_id)
+ nullable = 'NOT NULL' not in type_string
+ return ArrayType(nullable, element)
+
+ elif type_string.startswith("MULTISET"):
+ element =
DataTypeParser.parse_data_type(json_data.get('element'), field_id)
+ nullable = 'NOT NULL' not in type_string
+ return MultisetType(nullable, element)
+
+ elif type_string.startswith("MAP"):
+ key = DataTypeParser.parse_data_type(json_data.get('key'),
field_id)
+ value = DataTypeParser.parse_data_type(json_data.get('value'),
field_id)
+ nullable = 'NOT NULL' not in type_string
+ return MapType(nullable, key, value)
+
+ elif type_string.startswith("ROW"):
+ field_array = json_data.get('fields', [])
+ fields = []
+ for field_json in field_array:
+ fields.append(DataTypeParser.parse_data_field(field_json,
field_id))
+ nullable = 'NOT NULL' not in type_string
+ return RowType(nullable, fields)
+
+ else:
+ return DataTypeParser.parse_atomic_type_sql_string(type_string)
+
+ raise ValueError(f"Cannot parse data type: {json_data}")
+
+ @staticmethod
+ def parse_data_field(json_data: Dict[str, Any], field_id:
Optional[AtomicInteger] = None) -> DataField:
+
+ if DataField.FIELD_ID in json_data and json_data[DataField.FIELD_ID]
is not None:
+ if field_id is not None and field_id.get() != -1:
+ raise ValueError("Partial field id is not allowed.")
+ field_id_value = int(json_data['id'])
+ else:
+ if field_id is None:
+ raise ValueError("Field ID is required when not provided in
JSON")
+ field_id_value = field_id.increment_and_get()
+
+ if DataField.FIELD_NAME not in json_data:
+ raise ValueError("Missing 'name' field in JSON")
+ name = json_data[DataField.FIELD_NAME]
+
+ if DataField.FIELD_TYPE not in json_data:
+ raise ValueError("Missing 'type' field in JSON")
+ data_type =
DataTypeParser.parse_data_type(json_data[DataField.FIELD_TYPE], field_id)
+
+ description = json_data.get(DataField.FIELD_DESCRIPTION)
+
+ default_value = json_data.get(DataField.FIELD_DEFAULT_VALUE)
+
+ return DataField(
+ id=field_id_value,
+ name=name,
+ type=data_type,
+ description=description,
+ default_value=default_value
+ )
+
+
+def parse_data_type_from_json(json_str: str, field_id: Optional[AtomicInteger]
= None) -> DataType:
+ json_data = json.loads(json_str)
+ return DataTypeParser.parse_data_type(json_data, field_id)
+
+
+def parse_data_field_from_json(json_str: str, field_id:
Optional[AtomicInteger] = None) -> DataField:
+ json_data = json.loads(json_str)
+ return DataTypeParser.parse_data_field(json_data, field_id)
diff --git a/pypaimon/api/rest_json.py b/pypaimon/api/rest_json.py
index 57bdefdd06..c6a61449d2 100644
--- a/pypaimon/api/rest_json.py
+++ b/pypaimon/api/rest_json.py
@@ -16,36 +16,69 @@
# under the License.
import json
-from dataclasses import asdict
-from typing import Any, Type
+from dataclasses import field, fields, is_dataclass
+from typing import Any, Type, Dict
+
from api.typedef import T
+def json_field(json_name: str, **kwargs):
+ """Create a field with custom JSON name"""
+ return field(metadata={'json_name': json_name}, **kwargs)
+
+
class JSON:
- """Universal JSON serializer"""
+
+ @staticmethod
+ def to_json(obj: Any, **kwargs) -> str:
+ """Convert to JSON string"""
+ return json.dumps(JSON.__to_dict(obj), ensure_ascii=False, **kwargs)
@staticmethod
def from_json(json_str: str, target_class: Type[T]) -> T:
+ """Create instance from JSON string"""
data = json.loads(json_str)
- if hasattr(target_class, 'from_dict'):
- return target_class.from_dict(data)
- return data
+ return JSON.__from_dict(data, target_class)
@staticmethod
- def to_json(obj: Any) -> str:
- """Serialize any object to JSON"""
- return json.dumps(obj, default=JSON._default_serializer)
+ def __to_dict(obj: Any) -> Dict[str, Any]:
+ """Convert to dictionary with custom field names"""
+ result = {}
+ for field_info in fields(obj):
+ field_value = getattr(obj, field_info.name)
- @staticmethod
- def _default_serializer(obj):
- """Default serialization handler"""
+ # Get custom JSON name from metadata
+ json_name = field_info.metadata.get('json_name', field_info.name)
+
+ # Handle nested objects
+ if is_dataclass(field_value):
+ result[json_name] = JSON.__to_dict(field_value)
+ elif hasattr(field_value, 'to_dict'):
+ result[json_name] = field_value.to_dict()
+ elif isinstance(field_value, list):
+ result[json_name] = [
+ item.to_dict() if hasattr(item, 'to_dict') else item
+ for item in field_value
+ ]
+ else:
+ result[json_name] = field_value
- # Handle objects with to_dict method
- if hasattr(obj, 'to_dict') and callable(obj.to_dict):
- return obj.to_dict()
+ return result
+
+ @staticmethod
+ def __from_dict(data: Dict[str, Any], target_class: Type[T]) -> T:
+ """Create instance from dictionary"""
+ # Create field name mapping (json_name -> field_name)
+ field_mapping = {}
+ for field_info in fields(target_class):
+ json_name = field_info.metadata.get('json_name', field_info.name)
+ field_mapping[json_name] = field_info.name
- # Handle dataclass objects
- if hasattr(obj, '__dataclass_fields__'):
- return asdict(obj)
+ # Map JSON data to field names
+ kwargs = {}
+ for json_name, value in data.items():
+ if json_name in field_mapping:
+ field_name = field_mapping[json_name]
+ kwargs[field_name] = value
- raise TypeError(f"Object of type {type(obj).__name__} is not JSON")
\ No newline at end of file
+ return target_class(**kwargs)
diff --git a/pypaimon/api/typedef.py b/pypaimon/api/typedef.py
index 157fa9c1d6..bb6ce4cef4 100644
--- a/pypaimon/api/typedef.py
+++ b/pypaimon/api/typedef.py
@@ -14,11 +14,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
from dataclasses import dataclass
from typing import Optional, TypeVar
T = TypeVar('T')
+
@dataclass
class Identifier:
"""Table/View/Function identifier"""
@@ -58,4 +60,4 @@ class Identifier:
return self.branch_name
def is_system_table(self) -> bool:
- return self.object_name.startswith('$')
\ No newline at end of file
+ return self.object_name.startswith('$')
diff --git a/pypaimon/tests/api_test.py b/pypaimon/tests/api_test.py
index 577b9ca94c..74a5c44436 100644
--- a/pypaimon/tests/api_test.py
+++ b/pypaimon/tests/api_test.py
@@ -27,11 +27,11 @@ import unittest
import api
from api.api_response import (ConfigResponse, ListDatabasesResponse,
GetDatabaseResponse, TableMetadata, Schema,
- GetTableResponse, ListTablesResponse,
TableSchema, RESTResponse, PagedList, DataField,
- PaimonDataType)
+ GetTableResponse, ListTablesResponse,
TableSchema, RESTResponse, PagedList, DataField)
from api import RESTApi
from api.rest_json import JSON
from api.typedef import Identifier
+from api.data_types import AtomicInteger, DataTypeParser, AtomicType,
ArrayType, MapType, RowType
@dataclass
@@ -743,7 +743,94 @@ class RESTCatalogServer:
class ApiTestCase(unittest.TestCase):
- def test(self):
+ def test_parse_data(self):
+ simple_type_test_cases = [
+ "DECIMAL",
+ "DECIMAL(5)",
+ "DECIMAL(10, 2)",
+ "DECIMAL(38, 18)",
+ "VARBINARY",
+ "VARBINARY(100)",
+ "VARBINARY(1024)",
+ "BYTES",
+ "VARCHAR(255)",
+ "CHAR(10)",
+ "INT",
+ "BOOLEAN"
+ ]
+ for type_str in simple_type_test_cases:
+ data_type = DataTypeParser.parse_data_type(type_str)
+ self.assertEqual(data_type.nullable, True)
+ self.assertEqual(data_type.type, type_str)
+ field_id = AtomicInteger(0)
+ simple_type = DataTypeParser.parse_data_type("VARCHAR(32)")
+ self.assertEqual(simple_type.nullable, True)
+ self.assertEqual(simple_type.type, 'VARCHAR(32)')
+
+ array_json = {
+ "type": "ARRAY",
+ "element": "INT"
+ }
+ array_type = DataTypeParser.parse_data_type(array_json, field_id)
+ self.assertEqual(array_type.element.type, 'INT')
+
+ map_json = {
+ "type": "MAP",
+ "key": "STRING",
+ "value": "INT"
+ }
+ map_type = DataTypeParser.parse_data_type(map_json, field_id)
+ self.assertEqual(map_type.key.type, 'STRING')
+ self.assertEqual(map_type.value.type, 'INT')
+ row_json = {
+ "type": "ROW",
+ "fields": [
+ {
+ "name": "id",
+ "type": "BIGINT",
+ "description": "Primary key"
+ },
+ {
+ "name": "name",
+ "type": "VARCHAR(100)",
+ "description": "User name"
+ },
+ {
+ "name": "scores",
+ "type": {
+ "type": "ARRAY",
+ "element": "DOUBLE"
+ }
+ }
+ ]
+ }
+
+ row_type: RowType = DataTypeParser.parse_data_type(row_json,
AtomicInteger(0))
+ self.assertEqual(row_type.fields[0].type.type, 'BIGINT')
+ self.assertEqual(row_type.fields[1].type.type, 'VARCHAR(100)')
+
+ complex_json = {
+ "type": "ARRAY",
+ "element": {
+ "type": "MAP",
+ "key": "STRING",
+ "value": {
+ "type": "ROW",
+ "fields": [
+ {"name": "count", "type": "BIGINT"},
+ {"name": "percentage", "type": "DOUBLE"}
+ ]
+ }
+ }
+ }
+
+ complex_type: ArrayType = DataTypeParser.parse_data_type(complex_json,
field_id)
+ element_type: MapType = complex_type.element
+ value_type: RowType = element_type.value
+ self.assertEqual(value_type.fields[0].type.type, 'BIGINT')
+ self.assertEqual(value_type.fields[1].type.type, 'DOUBLE')
+
+ def test_api(self):
"""Example usage of RESTCatalogServer"""
# Setup logging
logging.basicConfig(level=logging.INFO)
@@ -773,11 +860,14 @@ class ApiTestCase(unittest.TestCase):
"test_db2": server.mock_database("test_db2", {"env": "test"}),
"prod_db": server.mock_database("prod_db", {"env": "prod"})
}
+ data_fields = [
+ DataField( 0, "name", AtomicType('INT'), 'desc name'),
+ DataField( 1, "arr11", ArrayType(True, AtomicType('INT')),
'desc arr11'),
+ DataField( 2, "map11", MapType(False, AtomicType('INT'),
MapType(False, AtomicType('INT'), AtomicType('INT'))), 'desc arr11'),
+ ]
+ schema = TableSchema(len(data_fields), data_fields,
len(data_fields), [], [], {}, "")
test_tables = {
- "default.user": TableMetadata(uuid=str(uuid.uuid4()),
is_external=True,
- schema=TableSchema(1,
-
[DataField("name", 0, "name", PaimonDataType('int'))],
- 1, [], [],
{}, "")),
+ "default.user": TableMetadata(uuid=str(uuid.uuid4()),
is_external=True,schema=schema),
}
server.table_metadata_store.update(test_tables)
server.database_store.update(test_databases)