This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 866dd0bd03 [Python] Support filesystem catalog for PyPaimon (#5986)
866dd0bd03 is described below
commit 866dd0bd039e4ad3a8da3725f5b01e86f9b64078
Author: ChengHui Chen <[email protected]>
AuthorDate: Wed Jul 30 14:51:37 2025 +0800
[Python] Support filesystem catalog for PyPaimon (#5986)
---
paimon-python/pypaimon/__init__.py | 12 +
paimon-python/pypaimon/api/__init__.py | 10 +-
paimon-python/pypaimon/api/api_response.py | 4 +-
paimon-python/pypaimon/api/api_resquest.py | 2 +-
paimon-python/pypaimon/api/auth.py | 8 +-
paimon-python/pypaimon/api/client.py | 2 +-
paimon-python/pypaimon/api/config.py | 3 +-
paimon-python/pypaimon/api/core_options.py | 2 +
paimon-python/pypaimon/api/identifier.py | 2 +-
paimon-python/pypaimon/api/token_loader.py | 22 +-
paimon-python/pypaimon/catalog/catalog.py | 20 +-
.../pypaimon/catalog/catalog_exception.py | 156 +++++++
.../file_io.py => catalog/catalog_factory.py} | 33 +-
paimon-python/pypaimon/catalog/catalog_utils.py | 1 -
.../pypaimon/catalog/filesystem_catalog.py | 103 +++++
paimon-python/pypaimon/common/file_io.py | 4 +
.../pypaimon/{api => common}/rest_json.py | 0
paimon-python/pypaimon/pvfs/__init__.py | 8 +-
paimon-python/pypaimon/rest/rest_catalog.py | 18 +-
.../pypaimon/{api => schema}/data_types.py | 0
paimon-python/pypaimon/schema/schema.py | 4 +-
paimon-python/pypaimon/schema/schema_manager.py | 2 +-
paimon-python/pypaimon/schema/table_schema.py | 6 +-
.../{common/file_io.py => table/bucket_mode.py} | 24 +-
paimon-python/pypaimon/table/file_store_table.py | 18 +-
.../{common/file_io.py => table/row/__init__.py} | 20 -
paimon-python/pypaimon/table/row/binary_row.py | 469 +++++++++++++++++++++
.../file_io.py => table/row/internal_row.py} | 43 +-
paimon-python/pypaimon/table/row/key_value.py | 57 +++
paimon-python/pypaimon/table/row/offset_row.py | 58 +++
paimon-python/pypaimon/table/row/row_kind.py | 61 +++
paimon-python/pypaimon/tests/api_test.py | 14 +-
paimon-python/pypaimon/tests/pvfs_test.py | 2 +-
paimon-python/pypaimon/tests/rest_server.py | 141 +------
34 files changed, 1074 insertions(+), 255 deletions(-)
diff --git a/paimon-python/pypaimon/__init__.py
b/paimon-python/pypaimon/__init__.py
index a67d5ea255..f5a9106115 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/__init__.py
@@ -14,3 +14,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+from .schema.schema import Schema
+from .catalog.catalog import Catalog
+from .catalog.database import Database
+from .table.table import Table
+
+__all__ = [
+ 'Schema',
+ 'Catalog',
+ 'Database',
+ 'Table',
+]
diff --git a/paimon-python/pypaimon/api/__init__.py
b/paimon-python/pypaimon/api/__init__.py
index 3c8d1fb2ef..2b35cd12df 100644
--- a/paimon-python/pypaimon/api/__init__.py
+++ b/paimon-python/pypaimon/api/__init__.py
@@ -31,7 +31,7 @@ from pypaimon.api.api_response import (
)
from pypaimon.api.api_resquest import CreateDatabaseRequest,
AlterDatabaseRequest, RenameTableRequest, \
CreateTableRequest
-from pypaimon.api.config import RESTCatalogOptions
+from pypaimon.api.config import CatalogOptions
from pypaimon.api.client import HttpClient
from pypaimon.api.identifier import Identifier
from pypaimon.api.typedef import T
@@ -89,7 +89,7 @@ class ResourcePaths:
@classmethod
def for_catalog_properties(
cls, options: dict[str, str]) -> "ResourcePaths":
- prefix = options.get(RESTCatalogOptions.PREFIX, "")
+ prefix = options.get(CatalogOptions.PREFIX, "")
return cls(prefix)
@staticmethod
@@ -131,15 +131,15 @@ class RESTApi:
def __init__(self, options: Dict[str, str], config_required: bool = True):
self.logger = logging.getLogger(self.__class__.__name__)
- self.client = HttpClient(options.get(RESTCatalogOptions.URI))
+ self.client = HttpClient(options.get(CatalogOptions.URI))
auth_provider = AuthProviderFactory.create_auth_provider(options)
base_headers = RESTUtil.extract_prefix_map(options, self.HEADER_PREFIX)
if config_required:
- warehouse = options.get(RESTCatalogOptions.WAREHOUSE)
+ warehouse = options.get(CatalogOptions.WAREHOUSE)
query_params = {}
if warehouse:
- query_params[RESTCatalogOptions.WAREHOUSE] =
RESTUtil.encode_string(
+ query_params[CatalogOptions.WAREHOUSE] =
RESTUtil.encode_string(
warehouse)
config_response = self.client.get_with_params(
diff --git a/paimon-python/pypaimon/api/api_response.py
b/paimon-python/pypaimon/api/api_response.py
index b3ec148e94..b50754b3cf 100644
--- a/paimon-python/pypaimon/api/api_response.py
+++ b/paimon-python/pypaimon/api/api_response.py
@@ -20,9 +20,9 @@ from abc import ABC, abstractmethod
from typing import Dict, Optional, Generic, List
from dataclasses import dataclass
-from .rest_json import json_field
-from pypaimon.schema.schema import Schema
+from pypaimon.common.rest_json import json_field
from .typedef import T
+from .. import Schema
@dataclass
diff --git a/paimon-python/pypaimon/api/api_resquest.py
b/paimon-python/pypaimon/api/api_resquest.py
index 3ec7c12edc..1ddc3ebe5a 100644
--- a/paimon-python/pypaimon/api/api_resquest.py
+++ b/paimon-python/pypaimon/api/api_resquest.py
@@ -22,7 +22,7 @@ from typing import Dict, List
from .api_response import Schema
from .identifier import Identifier
-from .rest_json import json_field
+from pypaimon.common.rest_json import json_field
class RESTRequest(ABC):
diff --git a/paimon-python/pypaimon/api/auth.py
b/paimon-python/pypaimon/api/auth.py
index dc51df0740..393fa15920 100644
--- a/paimon-python/pypaimon/api/auth.py
+++ b/paimon-python/pypaimon/api/auth.py
@@ -27,7 +27,7 @@ from typing import Optional, Dict
from .token_loader import DLFTokenLoader, DLFToken
from .typedef import RESTAuthParameter
-from .config import RESTCatalogOptions
+from .config import CatalogOptions
class AuthProvider(ABC):
@@ -61,13 +61,13 @@ class AuthProviderFactory:
@staticmethod
def create_auth_provider(options: Dict[str, str]) -> AuthProvider:
- provider = options.get(RESTCatalogOptions.TOKEN_PROVIDER)
+ provider = options.get(CatalogOptions.TOKEN_PROVIDER)
if provider == 'bear':
- token = options.get(RESTCatalogOptions.TOKEN)
+ token = options.get(CatalogOptions.TOKEN)
return BearTokenAuthProvider(token)
elif provider == 'dlf':
return DLFAuthProvider(
- options.get(RESTCatalogOptions.DLF_REGION),
+ options.get(CatalogOptions.DLF_REGION),
DLFToken.from_options(options)
)
raise ValueError('Unknown auth provider')
diff --git a/paimon-python/pypaimon/api/client.py
b/paimon-python/pypaimon/api/client.py
index 7f7203a1c5..c318809a73 100644
--- a/paimon-python/pypaimon/api/client.py
+++ b/paimon-python/pypaimon/api/client.py
@@ -30,7 +30,7 @@ from urllib3 import Retry
from .typedef import RESTAuthParameter
from .api_response import ErrorResponse
-from .rest_json import JSON
+from pypaimon.common.rest_json import JSON
T = TypeVar('T', bound='RESTResponse')
diff --git a/paimon-python/pypaimon/api/config.py
b/paimon-python/pypaimon/api/config.py
index 54f7a8bc49..2a33182993 100644
--- a/paimon-python/pypaimon/api/config.py
+++ b/paimon-python/pypaimon/api/config.py
@@ -22,8 +22,9 @@ class OssOptions:
OSS_ENDPOINT = "fs.oss.endpoint"
-class RESTCatalogOptions:
+class CatalogOptions:
URI = "uri"
+ METASTORE = "metastore"
WAREHOUSE = "warehouse"
TOKEN_PROVIDER = "token.provider"
TOKEN = "token"
diff --git a/paimon-python/pypaimon/api/core_options.py
b/paimon-python/pypaimon/api/core_options.py
index 2745d1c281..18d2fba291 100644
--- a/paimon-python/pypaimon/api/core_options.py
+++ b/paimon-python/pypaimon/api/core_options.py
@@ -42,3 +42,5 @@ class CoreOptions(str, Enum):
FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
FILE_FORMAT_PER_LEVEL = "file.format.per.level"
FILE_BLOCK_SIZE = "file.block-size"
+ # Scan options
+ SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
diff --git a/paimon-python/pypaimon/api/identifier.py
b/paimon-python/pypaimon/api/identifier.py
index 0d280bbf21..3a27870516 100644
--- a/paimon-python/pypaimon/api/identifier.py
+++ b/paimon-python/pypaimon/api/identifier.py
@@ -18,7 +18,7 @@
from dataclasses import dataclass
from typing import Optional
-from pypaimon.api.rest_json import json_field
+from pypaimon.common.rest_json import json_field
SYSTEM_TABLE_SPLITTER = '$'
SYSTEM_BRANCH_PREFIX = 'branch-'
diff --git a/paimon-python/pypaimon/api/token_loader.py
b/paimon-python/pypaimon/api/token_loader.py
index 223490646c..9214345fce 100644
--- a/paimon-python/pypaimon/api/token_loader.py
+++ b/paimon-python/pypaimon/api/token_loader.py
@@ -24,8 +24,8 @@ import requests
from requests.adapters import HTTPAdapter
from requests.exceptions import RequestException
-from .rest_json import json_field, JSON
-from .config import RESTCatalogOptions
+from pypaimon.common.rest_json import json_field, JSON
+from .config import CatalogOptions
from .client import ExponentialRetry
@@ -59,15 +59,15 @@ class DLFToken:
@classmethod
def from_options(cls, options: Dict[str, str]) -> Optional['DLFToken']:
- from .config import RESTCatalogOptions
- if (options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID) is None
- or options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET) is
None):
+ from .config import CatalogOptions
+ if (options.get(CatalogOptions.DLF_ACCESS_KEY_ID) is None
+ or options.get(CatalogOptions.DLF_ACCESS_KEY_SECRET) is None):
return None
else:
return cls(
-
access_key_id=options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID),
-
access_key_secret=options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET),
-
security_token=options.get(RESTCatalogOptions.DLF_ACCESS_SECURITY_TOKEN)
+ access_key_id=options.get(CatalogOptions.DLF_ACCESS_KEY_ID),
+
access_key_secret=options.get(CatalogOptions.DLF_ACCESS_KEY_SECRET),
+
security_token=options.get(CatalogOptions.DLF_ACCESS_SECURITY_TOKEN)
)
@@ -204,12 +204,12 @@ class DLFTokenLoaderFactory:
@staticmethod
def create_token_loader(options: Dict[str, str]) ->
Optional['DLFTokenLoader']:
"""Create ECS token loader"""
- loader = options.get(RESTCatalogOptions.DLF_TOKEN_LOADER)
+ loader = options.get(CatalogOptions.DLF_TOKEN_LOADER)
if loader == 'ecs':
ecs_metadata_url = options.get(
- RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL,
+ CatalogOptions.DLF_TOKEN_ECS_METADATA_URL,
'http://100.100.100.200/latest/meta-data/Ram/security-credentials/'
)
- role_name = options.get(RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME)
+ role_name = options.get(CatalogOptions.DLF_TOKEN_ECS_ROLE_NAME)
return DLFECSTokenLoader(ecs_metadata_url, role_name)
return None
diff --git a/paimon-python/pypaimon/catalog/catalog.py
b/paimon-python/pypaimon/catalog/catalog.py
index c133b362c5..f140f53852 100644
--- a/paimon-python/pypaimon/catalog/catalog.py
+++ b/paimon-python/pypaimon/catalog/catalog.py
@@ -17,7 +17,10 @@
#################################################################################
from abc import ABC, abstractmethod
-from typing import Optional
+from typing import Optional, Union
+
+from pypaimon import Schema
+from pypaimon.api import Identifier
class Catalog(ABC):
@@ -25,13 +28,26 @@ class Catalog(ABC):
This interface is responsible for reading and writing
metadata such as database/table from a paimon catalog.
"""
+ DB_SUFFIX = ".db"
+ DEFAULT_DATABASE = "default"
SYSTEM_DATABASE_NAME = "sys"
+
DB_LOCATION_PROP = "location"
+ COMMENT_PROP = "comment"
+ OWNER_PROP = "owner"
@abstractmethod
def get_database(self, name: str) -> 'Database':
"""Get paimon database identified by the given name."""
@abstractmethod
- def create_database(self, name: str, properties: Optional[dict] = None):
+ def create_database(self, name: str, ignore_if_exists: bool, properties:
Optional[dict] = None):
"""Create a database with properties."""
+
+ @abstractmethod
+ def get_table(self, identifier: Union[str, Identifier]) -> 'Table':
+ """Get paimon table identified by the given Identifier."""
+
+ @abstractmethod
+ def create_table(self, identifier: Union[str, Identifier], schema: Schema,
ignore_if_exists: bool):
+ """Create table with schema."""
diff --git a/paimon-python/pypaimon/catalog/catalog_exception.py
b/paimon-python/pypaimon/catalog/catalog_exception.py
new file mode 100644
index 0000000000..c29243c504
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/catalog_exception.py
@@ -0,0 +1,156 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pypaimon.api import Identifier
+
+
+# Exception classes
+class CatalogException(Exception):
+ """Base catalog exception"""
+
+
+class DatabaseNotExistException(CatalogException):
+ """Database not exist exception"""
+
+ def __init__(self, database: str):
+ self.database = database
+ super().__init__(f"Database {database} does not exist")
+
+
+class DatabaseAlreadyExistException(CatalogException):
+ """Database already exist exception"""
+
+ def __init__(self, database: str):
+ self.database = database
+ super().__init__(f"Database {database} already exists")
+
+
+class DatabaseNoPermissionException(CatalogException):
+ """Database no permission exception"""
+
+ def __init__(self, database: str):
+ self.database = database
+ super().__init__(f"No permission to access database {database}")
+
+
+class TableNotExistException(CatalogException):
+ """Table not exist exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"Table {identifier.get_full_name()} does not exist")
+
+
+class TableAlreadyExistException(CatalogException):
+ """Table already exist exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"Table {identifier.get_full_name()} already exists")
+
+
+class TableNoPermissionException(CatalogException):
+ """Table no permission exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"No permission to access table
{identifier.get_full_name()}")
+
+
+class ViewNotExistException(CatalogException):
+ """View not exist exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"View {identifier.get_full_name()} does not exist")
+
+
+class ViewAlreadyExistException(CatalogException):
+ """View already exist exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"View {identifier.get_full_name()} already exists")
+
+
+class FunctionNotExistException(CatalogException):
+ """Function not exist exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"Function {identifier.get_full_name()} does not
exist")
+
+
+class FunctionAlreadyExistException(CatalogException):
+ """Function already exist exception"""
+
+ def __init__(self, identifier: Identifier):
+ self.identifier = identifier
+ super().__init__(f"Function {identifier.get_full_name()} already
exists")
+
+
+class ColumnNotExistException(CatalogException):
+ """Column not exist exception"""
+
+ def __init__(self, column: str):
+ self.column = column
+ super().__init__(f"Column {column} does not exist")
+
+
+class ColumnAlreadyExistException(CatalogException):
+ """Column already exist exception"""
+
+ def __init__(self, column: str):
+ self.column = column
+ super().__init__(f"Column {column} already exists")
+
+
+class DefinitionNotExistException(CatalogException):
+ """Definition not exist exception"""
+
+ def __init__(self, identifier: Identifier, name: str):
+ self.identifier = identifier
+ self.name = name
+ super().__init__(f"Definition {name} does not exist in
{identifier.get_full_name()}")
+
+
+class DefinitionAlreadyExistException(CatalogException):
+ """Definition already exist exception"""
+
+ def __init__(self, identifier: Identifier, name: str):
+ self.identifier = identifier
+ self.name = name
+ super().__init__(f"Definition {name} already exists in
{identifier.get_full_name()}")
+
+
+class DialectNotExistException(CatalogException):
+ """Dialect not exist exception"""
+
+ def __init__(self, identifier: Identifier, dialect: str):
+ self.identifier = identifier
+ self.dialect = dialect
+ super().__init__(f"Dialect {dialect} does not exist in
{identifier.get_full_name()}")
+
+
+class DialectAlreadyExistException(CatalogException):
+ """Dialect already exist exception"""
+
+ def __init__(self, identifier: Identifier, dialect: str):
+ self.identifier = identifier
+ self.dialect = dialect
+ super().__init__(f"Dialect {dialect} already exists in
{identifier.get_full_name()}")
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/catalog/catalog_factory.py
similarity index 55%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/catalog/catalog_factory.py
index 9e7418966f..b5ec706ae5 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/catalog/catalog_factory.py
@@ -15,23 +15,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
+from pypaimon import Catalog
+from pypaimon.api import CatalogOptions
+from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
+from pypaimon.rest.rest_catalog import RESTCatalog
-class FileIO(ABC):
- @abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
+class CatalogFactory:
- @abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
+ CATALOG_REGISTRY = {
+ "filesystem": FileSystemCatalog,
+ "rest": RESTCatalog,
+ }
- @abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
-
- @abstractmethod
- def list_status(self, path: Path):
- """"""
+ @staticmethod
+ def create(catalog_options: dict) -> Catalog:
+ identifier = catalog_options.get(CatalogOptions.METASTORE,
"filesystem")
+ catalog_class = CatalogFactory.CATALOG_REGISTRY.get(identifier)
+ if catalog_class is None:
+ raise ValueError(f"Unknown catalog identifier: {identifier}. "
+ f"Available types:
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
+ return catalog_class(catalog_options)
diff --git a/paimon-python/pypaimon/catalog/catalog_utils.py
b/paimon-python/pypaimon/catalog/catalog_utils.py
index d51de0754d..d9877ce53a 100644
--- a/paimon-python/pypaimon/catalog/catalog_utils.py
+++ b/paimon-python/pypaimon/catalog/catalog_utils.py
@@ -21,7 +21,6 @@ from typing import Callable, Any
from pypaimon.api.core_options import CoreOptions
from pypaimon.api.identifier import Identifier
-from pypaimon.catalog.catalog import Catalog
from pypaimon.catalog.table_metadata import TableMetadata
from pypaimon.table.catalog_environment import CatalogEnvironment
from pypaimon.table.file_store_table import FileStoreTable
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
new file mode 100644
index 0000000000..6651eec317
--- /dev/null
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -0,0 +1,103 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+from pathlib import Path
+from typing import Optional, Union
+from urllib.parse import urlparse
+
+from pypaimon import Catalog, Database, Table
+from pypaimon.api import CatalogOptions, Identifier
+from pypaimon.api.core_options import CoreOptions
+from pypaimon.catalog.catalog_exception import TableNotExistException,
DatabaseNotExistException, \
+ TableAlreadyExistException, DatabaseAlreadyExistException
+from pypaimon.schema.schema_manager import SchemaManager
+from pypaimon.table.file_store_table import FileStoreTable
+
+
+class FileSystemCatalog(Catalog):
+ def __init__(self, catalog_options: dict):
+ if CatalogOptions.WAREHOUSE not in catalog_options:
+ raise ValueError(f"Paimon '{CatalogOptions.WAREHOUSE}' path must
be set")
+ self.warehouse = catalog_options.get(CatalogOptions.WAREHOUSE)
+ self.catalog_options = catalog_options
+ self.file_io = None # FileIO(self.warehouse, self.catalog_options)
+
+ def get_database(self, name: str) -> Database:
+ if self.file_io.exists(self.get_database_path(name)):
+ return Database(name, {})
+ else:
+ raise DatabaseNotExistException(name)
+
+ def create_database(self, name: str, ignore_if_exists: bool, properties:
Optional[dict] = None):
+ try:
+ self.get_database(name)
+ if not ignore_if_exists:
+ raise DatabaseAlreadyExistException(name)
+ except DatabaseNotExistException:
+ if properties and Catalog.DB_LOCATION_PROP in properties:
+ raise ValueError("Cannot specify location for a database when
using fileSystem catalog.")
+ path = self.get_database_path(name)
+ self.file_io.mkdirs(path)
+
+ def get_table(self, identifier: Union[str, Identifier]) -> Table:
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ if CoreOptions.SCAN_FALLBACK_BRANCH in self.catalog_options:
+ raise ValueError(CoreOptions.SCAN_FALLBACK_BRANCH)
+ table_path = self.get_table_path(identifier)
+ table_schema = self.get_table_schema(identifier)
+ return FileStoreTable(self.file_io, identifier, table_path,
table_schema)
+
+ def create_table(self, identifier: Union[str, Identifier], schema:
'Schema', ignore_if_exists: bool):
+ if schema.options and schema.options.get(CoreOptions.AUTO_CREATE):
+ raise ValueError(f"The value of {CoreOptions.AUTO_CREATE} property
should be False.")
+
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
+ self.get_database(identifier.get_database_name())
+ try:
+ self.get_table(identifier)
+ if not ignore_if_exists:
+ raise TableAlreadyExistException(identifier)
+ except TableNotExistException:
+ if schema.options and CoreOptions.TYPE in schema.options and
schema.options.get(
+ CoreOptions.TYPE) != "table":
+ raise ValueError(f"Table Type
{schema.options.get(CoreOptions.TYPE)}")
+ table_path = self.get_table_path(identifier)
+ schema_manager = SchemaManager(self.file_io, table_path)
+ schema_manager.create_table(schema)
+
+ def get_table_schema(self, identifier: Identifier):
+ table_path = self.get_table_path(identifier)
+ table_schema = SchemaManager(self.file_io, table_path).latest()
+ if table_schema is None:
+ raise TableNotExistException(identifier)
+ return table_schema
+
+ def get_database_path(self, name) -> Path:
+ return self._trim_schema(self.warehouse) / f"{name}{Catalog.DB_SUFFIX}"
+
+ def get_table_path(self, identifier: Identifier) -> Path:
+ return self.get_database_path(identifier.get_database_name()) /
identifier.get_table_name()
+
+ @staticmethod
+ def _trim_schema(warehouse_url: str) -> Path:
+ parsed = urlparse(warehouse_url)
+ bucket = parsed.netloc
+ warehouse_dir = parsed.path.lstrip('/')
+ return Path(f"{bucket}/{warehouse_dir}" if warehouse_dir else bucket)
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 9e7418966f..38602a709b 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -35,3 +35,7 @@ class FileIO(ABC):
@abstractmethod
def list_status(self, path: Path):
""""""
+
+ @abstractmethod
+ def mkdirs(self, path: Path) -> bool:
+ """"""
diff --git a/paimon-python/pypaimon/api/rest_json.py
b/paimon-python/pypaimon/common/rest_json.py
similarity index 100%
rename from paimon-python/pypaimon/api/rest_json.py
rename to paimon-python/pypaimon/common/rest_json.py
diff --git a/paimon-python/pypaimon/pvfs/__init__.py
b/paimon-python/pypaimon/pvfs/__init__.py
index 0351efa0eb..2b425d0251 100644
--- a/paimon-python/pypaimon/pvfs/__init__.py
+++ b/paimon-python/pypaimon/pvfs/__init__.py
@@ -33,7 +33,7 @@ from fsspec.implementations.local import LocalFileSystem
from pypaimon.api import RESTApi, GetTableTokenResponse, Schema, Identifier,
GetTableResponse
from pypaimon.api.client import NoSuchResourceException, AlreadyExistsException
-from pypaimon.api.config import RESTCatalogOptions, OssOptions, PVFSOptions
+from pypaimon.api.config import CatalogOptions, OssOptions, PVFSOptions
PROTOCOL_NAME = "pvfs"
@@ -117,9 +117,9 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
_identifier_pattern =
re.compile("^pvfs://([^/]+)/([^/]+)/([^/]+)(?:/[^/]+)*/?$")
def __init__(self, options: Dict = None, **kwargs):
- options.update({RESTCatalogOptions.HTTP_USER_AGENT_HEADER:
'PythonPVFS'})
+ options.update({CatalogOptions.HTTP_USER_AGENT_HEADER: 'PythonPVFS'})
self.options = options
- self.warehouse = options.get(RESTCatalogOptions.WAREHOUSE)
+ self.warehouse = options.get(CatalogOptions.WAREHOUSE)
cache_expired_time = (
PVFSOptions.DEFAULT_TABLE_CACHE_TTL
if options is None
@@ -791,7 +791,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
rest_api = self._rest_api_cache.get(catalog)
if rest_api is None:
options = self.options.copy()
- options.update({RESTCatalogOptions.WAREHOUSE: catalog})
+ options.update({CatalogOptions.WAREHOUSE: catalog})
rest_api = RESTApi(options)
self._rest_api_cache[catalog] = rest_api
return rest_api
diff --git a/paimon-python/pypaimon/rest/rest_catalog.py
b/paimon-python/pypaimon/rest/rest_catalog.py
index c1ba37cd78..2b66ea9981 100644
--- a/paimon-python/pypaimon/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/rest/rest_catalog.py
@@ -16,9 +16,10 @@ See the License for the specific language governing
permissions and
limitations under the License.
"""
from pathlib import Path
-from typing import List, Dict, Optional
+from typing import List, Dict, Optional, Union
-from pypaimon.api import RESTApi, RESTCatalogOptions
+from pypaimon import Database, Catalog, Schema
+from pypaimon.api import RESTApi, CatalogOptions
from pypaimon.api.api_response import PagedList, GetTableResponse
from pypaimon.api.core_options import CoreOptions
from pypaimon.api.identifier import Identifier
@@ -26,10 +27,8 @@ from pypaimon.api.options import Options
from pypaimon.schema.table_schema import TableSchema
-from pypaimon.catalog.catalog import Catalog
from pypaimon.catalog.catalog_context import CatalogContext
from pypaimon.catalog.catalog_utils import CatalogUtils
-from pypaimon.catalog.database import Database
from pypaimon.catalog.property_change import PropertyChange
from pypaimon.catalog.table_metadata import TableMetadata
from pypaimon.rest.rest_token_file_io import RESTTokenFileIO
@@ -41,7 +40,7 @@ class RESTCatalog(Catalog):
self.api = RESTApi(context.options.to_map(), config_required)
self.context = CatalogContext.create(Options(self.api.options),
context.hadoop_conf, context.prefer_io_loader,
context.fallback_io_loader)
- self.data_token_enabled =
self.api.options.get(RESTCatalogOptions.DATA_TOKEN_ENABLED)
+ self.data_token_enabled =
self.api.options.get(CatalogOptions.DATA_TOKEN_ENABLED)
def list_databases(self) -> List[str]:
return self.api.list_databases()
@@ -50,7 +49,7 @@ class RESTCatalog(Catalog):
database_name_pattern: Optional[str] = None) ->
PagedList[str]:
return self.api.list_databases_paged(max_results, page_token,
database_name_pattern)
- def create_database(self, name: str, properties: Dict[str, str] = None):
+ def create_database(self, name: str, ignore_if_exists: bool, properties:
Dict[str, str] = None):
self.api.create_database(name, properties)
def get_database(self, name: str) -> Database:
@@ -85,7 +84,9 @@ class RESTCatalog(Catalog):
table_name_pattern
)
- def get_table(self, identifier: Identifier) -> FileStoreTable:
+ def get_table(self, identifier: Union[str, Identifier]) -> FileStoreTable:
+ if not isinstance(identifier, Identifier):
+ identifier = Identifier.from_string(identifier)
return CatalogUtils.load_table(
identifier,
lambda path: self.file_io_for_data(path, identifier),
@@ -93,6 +94,9 @@ class RESTCatalog(Catalog):
self.load_table_metadata,
)
+ def create_table(self, identifier: Union[str, Identifier], schema: Schema,
ignore_if_exists: bool):
+ raise ValueError("Not implemented")
+
def load_table_metadata(self, identifier: Identifier) -> TableMetadata:
response = self.api.get_table(identifier)
return self.to_table_metadata(identifier.get_database_name(), response)
diff --git a/paimon-python/pypaimon/api/data_types.py
b/paimon-python/pypaimon/schema/data_types.py
similarity index 100%
rename from paimon-python/pypaimon/api/data_types.py
rename to paimon-python/pypaimon/schema/data_types.py
diff --git a/paimon-python/pypaimon/schema/schema.py
b/paimon-python/pypaimon/schema/schema.py
index 0a826bdc8e..354a9f1d21 100644
--- a/paimon-python/pypaimon/schema/schema.py
+++ b/paimon-python/pypaimon/schema/schema.py
@@ -19,8 +19,8 @@ from dataclasses import dataclass
from typing import Optional, List, Dict
import pyarrow as pa
-from pypaimon.api.data_types import DataField
-from pypaimon.api.rest_json import json_field
+from pypaimon.schema.data_types import DataField
+from pypaimon.common.rest_json import json_field
@dataclass
diff --git a/paimon-python/pypaimon/schema/schema_manager.py
b/paimon-python/pypaimon/schema/schema_manager.py
index 43e773c46e..bd755d1090 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -18,8 +18,8 @@
from pathlib import Path
from typing import Optional
+from pypaimon import Schema
from pypaimon.common.file_io import FileIO
-from pypaimon.schema.schema import Schema
from pypaimon.schema.table_schema import TableSchema
diff --git a/paimon-python/pypaimon/schema/table_schema.py
b/paimon-python/pypaimon/schema/table_schema.py
index 11ee24ae98..7d3f6aea64 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -22,11 +22,11 @@ from typing import List, Dict, Optional
import pyarrow
-from pypaimon.api import data_types
+from pypaimon import Schema
+from pypaimon.schema import data_types
from pypaimon.api.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
-from pypaimon.schema.schema import Schema
-from pypaimon.api.data_types import DataField
+from pypaimon.schema.data_types import DataField
class TableSchema:
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/table/bucket_mode.py
similarity index 69%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/table/bucket_mode.py
index 9e7418966f..df6c8e949d 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/table/bucket_mode.py
@@ -15,23 +15,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
+from enum import Enum, auto
-class FileIO(ABC):
- @abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
- @abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
+class BucketMode(Enum):
+ def __str__(self):
+ return self.value
- @abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
-
- @abstractmethod
- def list_status(self, path: Path):
- """"""
+ HASH_FIXED = auto()
+ HASH_DYNAMIC = auto()
+ CROSS_PARTITION = auto()
+ BUCKET_UNAWARE = auto()
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 3e8f5bb166..9f230d3463 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -18,11 +18,13 @@
from pathlib import Path
+from pypaimon import Table
+from pypaimon.api.core_options import CoreOptions
from pypaimon.api.identifier import Identifier
from pypaimon.schema.table_schema import TableSchema
from pypaimon.common.file_io import FileIO
from pypaimon.schema.schema_manager import SchemaManager
-from pypaimon.table.table import Table
+from pypaimon.table.bucket_mode import BucketMode
class FileStoreTable(Table):
@@ -40,3 +42,17 @@ class FileStoreTable(Table):
self.table_schema = table_schema
self.schema_manager = SchemaManager(file_io, table_path)
self.is_primary_key_table = bool(self.primary_keys)
+
+ def bucket_mode(self) -> BucketMode:
+ if self.is_primary_key_table:
+ if self.primary_keys == self.partition_keys:
+ return BucketMode.CROSS_PARTITION
+ if self.options.get(CoreOptions.BUCKET, -1) == -1:
+ return BucketMode.HASH_DYNAMIC
+ else:
+ return BucketMode.HASH_FIXED
+ else:
+ if self.options.get(CoreOptions.BUCKET, -1) == -1:
+ return BucketMode.BUCKET_UNAWARE
+ else:
+ return BucketMode.HASH_FIXED
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/table/row/__init__.py
similarity index 69%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/table/row/__init__.py
index 9e7418966f..65b48d4d79 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/table/row/__init__.py
@@ -15,23 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
-
-
-class FileIO(ABC):
- @abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
-
- @abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
-
- @abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
-
- @abstractmethod
- def list_status(self, path: Path):
- """"""
diff --git a/paimon-python/pypaimon/table/row/binary_row.py
b/paimon-python/pypaimon/table/row/binary_row.py
new file mode 100644
index 0000000000..aff9ffac05
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/binary_row.py
@@ -0,0 +1,469 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import struct
+from dataclasses import dataclass
+from datetime import datetime, timezone, timedelta
+from decimal import Decimal
+from typing import List, Any
+
+from pypaimon.schema.data_types import DataField, DataType, AtomicType
+from pypaimon.table.row.row_kind import RowKind
+
+
+@dataclass
+class BinaryRow:
+ values: List[Any]
+ fields: List[DataField]
+ row_kind: RowKind = RowKind.INSERT
+
+ def to_dict(self):
+ return {self.fields[i].name: self.values[i] for i in
range(len(self.fields))}
+
+
+class BinaryRowDeserializer:
+ HEADER_SIZE_IN_BITS = 8
+ MAX_FIX_PART_DATA_SIZE = 7
+ HIGHEST_FIRST_BIT = 0x80 << 56
+ HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7F << 56
+
+ @classmethod
+ def from_bytes(
+ cls,
+ bytes_data: bytes,
+ data_fields: List[DataField]
+ ) -> BinaryRow:
+ if not bytes_data:
+ return BinaryRow([], data_fields)
+
+ arity = len(data_fields)
+ actual_data = bytes_data
+ if len(bytes_data) >= 4:
+ arity_from_bytes = struct.unpack('>i', bytes_data[:4])[0]
+ if 0 < arity_from_bytes < 1000:
+ actual_data = bytes_data[4:]
+
+ fields = []
+ null_bits_size_in_bytes = cls._calculate_bit_set_width_in_bytes(arity)
+ for i, data_field in enumerate(data_fields):
+ value = None
+ if not cls._is_null_at(actual_data, 0, i):
+ value = cls._parse_field_value(actual_data, 0,
null_bits_size_in_bytes, i, data_field.type)
+ fields.append(value)
+
+ return BinaryRow(fields, data_fields, RowKind(actual_data[0]))
+
+ @classmethod
+ def _calculate_bit_set_width_in_bytes(cls, arity: int) -> int:
+ return ((arity + 63 + cls.HEADER_SIZE_IN_BITS) // 64) * 8
+
+ @classmethod
+ def _is_null_at(cls, bytes_data: bytes, offset: int, pos: int) -> bool:
+ index = pos + cls.HEADER_SIZE_IN_BITS
+ byte_index = offset + (index // 8)
+ bit_index = index % 8
+ return (bytes_data[byte_index] & (1 << bit_index)) != 0
+
+ @classmethod
+ def _parse_field_value(
+ cls,
+ bytes_data: bytes,
+ base_offset: int,
+ null_bits_size_in_bytes: int,
+ pos: int,
+ data_type: DataType
+ ) -> Any:
+ if not isinstance(data_type, AtomicType):
+ raise ValueError(f"BinaryRow only support AtomicType yet, meet
{data_type.__class__}")
+ field_offset = base_offset + null_bits_size_in_bytes + pos * 8
+ if field_offset >= len(bytes_data):
+ raise ValueError(f"Field offset {field_offset} exceeds data length
{len(bytes_data)}")
+ type_name = data_type.type.upper()
+
+ if type_name in ['BOOLEAN', 'BOOL']:
+ return cls._parse_boolean(bytes_data, field_offset)
+ elif type_name in ['TINYINT', 'BYTE']:
+ return cls._parse_byte(bytes_data, field_offset)
+ elif type_name in ['SMALLINT', 'SHORT']:
+ return cls._parse_short(bytes_data, field_offset)
+ elif type_name in ['INT', 'INTEGER']:
+ return cls._parse_int(bytes_data, field_offset)
+ elif type_name in ['BIGINT', 'LONG']:
+ return cls._parse_long(bytes_data, field_offset)
+ elif type_name in ['FLOAT', 'REAL']:
+ return cls._parse_float(bytes_data, field_offset)
+ elif type_name in ['DOUBLE']:
+ return cls._parse_double(bytes_data, field_offset)
+ elif type_name in ['VARCHAR', 'STRING', 'CHAR']:
+ return cls._parse_string(bytes_data, base_offset, field_offset)
+ elif type_name in ['BINARY', 'VARBINARY', 'BYTES']:
+ return cls._parse_binary(bytes_data, base_offset, field_offset)
+ elif type_name in ['DECIMAL', 'NUMERIC']:
+ return cls._parse_decimal(bytes_data, base_offset, field_offset,
data_type)
+ elif type_name in ['TIMESTAMP', 'TIMESTAMP_WITHOUT_TIME_ZONE']:
+ return cls._parse_timestamp(bytes_data, base_offset, field_offset,
data_type)
+ elif type_name in ['DATE']:
+ return cls._parse_date(bytes_data, field_offset)
+ elif type_name in ['TIME', 'TIME_WITHOUT_TIME_ZONE']:
+ return cls._parse_time(bytes_data, field_offset)
+ else:
+ return cls._parse_string(bytes_data, base_offset, field_offset)
+
+ @classmethod
+ def _parse_boolean(cls, bytes_data: bytes, field_offset: int) -> bool:
+ return bytes_data[field_offset] != 0
+
+ @classmethod
+ def _parse_byte(cls, bytes_data: bytes, field_offset: int) -> int:
+ return struct.unpack('<b', bytes_data[field_offset:field_offset +
1])[0]
+
+ @classmethod
+ def _parse_short(cls, bytes_data: bytes, field_offset: int) -> int:
+ return struct.unpack('<h', bytes_data[field_offset:field_offset +
2])[0]
+
+ @classmethod
+ def _parse_int(cls, bytes_data: bytes, field_offset: int) -> int:
+ if field_offset + 4 > len(bytes_data):
+ raise ValueError(f"Not enough bytes for INT: need 4, have
{len(bytes_data) - field_offset}")
+ return struct.unpack('<i', bytes_data[field_offset:field_offset +
4])[0]
+
+ @classmethod
+ def _parse_long(cls, bytes_data: bytes, field_offset: int) -> int:
+ if field_offset + 8 > len(bytes_data):
+ raise ValueError(f"Not enough bytes for LONG: need 8, have
{len(bytes_data) - field_offset}")
+ return struct.unpack('<q', bytes_data[field_offset:field_offset +
8])[0]
+
+ @classmethod
+ def _parse_float(cls, bytes_data: bytes, field_offset: int) -> float:
+ return struct.unpack('<f', bytes_data[field_offset:field_offset +
4])[0]
+
+ @classmethod
+ def _parse_double(cls, bytes_data: bytes, field_offset: int) -> float:
+ if field_offset + 8 > len(bytes_data):
+ raise ValueError(f"Not enough bytes for DOUBLE: need 8, have
{len(bytes_data) - field_offset}")
+ return struct.unpack('<d', bytes_data[field_offset:field_offset +
8])[0]
+
+ @classmethod
+ def _parse_string(cls, bytes_data: bytes, base_offset: int, field_offset:
int) -> str:
+ if field_offset + 8 > len(bytes_data):
+ raise ValueError(f"Not enough bytes for STRING offset: need 8,
have {len(bytes_data) - field_offset}")
+
+ offset_and_len = struct.unpack('<q',
bytes_data[field_offset:field_offset + 8])[0]
+ mark = offset_and_len & cls.HIGHEST_FIRST_BIT
+ if mark == 0:
+ sub_offset = (offset_and_len >> 32) & 0xFFFFFFFF
+ length = offset_and_len & 0xFFFFFFFF
+ actual_string_offset = base_offset + sub_offset
+ if actual_string_offset + length > len(bytes_data):
+ raise ValueError(
+ f"String data out of bounds:
actual_offset={actual_string_offset}, length={length}, "
+ f"total_length={len(bytes_data)}")
+ string_data = bytes_data[actual_string_offset:actual_string_offset
+ length]
+ return string_data.decode('utf-8')
+ else:
+ length = (offset_and_len & cls.HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56
+ start_offset = field_offset
+ if start_offset + length > len(bytes_data):
+ raise ValueError(f"Compact string data out of bounds:
length={length}")
+ string_data = bytes_data[start_offset:start_offset + length]
+ return string_data.decode('utf-8')
+
+ @classmethod
+ def _parse_binary(cls, bytes_data: bytes, base_offset: int, field_offset:
int) -> bytes:
+ offset_and_len = struct.unpack('<q',
bytes_data[field_offset:field_offset + 8])[0]
+ mark = offset_and_len & cls.HIGHEST_FIRST_BIT
+ if mark == 0:
+ sub_offset = (offset_and_len >> 32) & 0xFFFFFFFF
+ length = offset_and_len & 0xFFFFFFFF
+ return bytes_data[base_offset + sub_offset:base_offset +
sub_offset + length]
+ else:
+ length = (offset_and_len & cls.HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56
+ return bytes_data[field_offset + 1:field_offset + 1 + length]
+
+ @classmethod
+ def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset:
int, data_type: DataType) -> Decimal:
+ unscaled_long = struct.unpack('<q',
bytes_data[field_offset:field_offset + 8])[0]
+ type_str = str(data_type)
+ if '(' in type_str and ')' in type_str:
+ try:
+ precision_scale = type_str.split('(')[1].split(')')[0]
+ if ',' in precision_scale:
+ scale = int(precision_scale.split(',')[1])
+ else:
+ scale = 0
+ except:
+ scale = 0
+ else:
+ scale = 0
+ return Decimal(unscaled_long) / (10 ** scale)
+
+ @classmethod
+ def _parse_timestamp(cls, bytes_data: bytes, base_offset: int,
field_offset: int, data_type: DataType) -> datetime:
+ millis = struct.unpack('<q', bytes_data[field_offset:field_offset +
8])[0]
+ return datetime.fromtimestamp(millis / 1000.0, tz=timezone.utc)
+
+ @classmethod
+ def _parse_date(cls, bytes_data: bytes, field_offset: int) -> datetime:
+ days = struct.unpack('<i', bytes_data[field_offset:field_offset +
4])[0]
+ return datetime(1970, 1, 1) + timedelta(days=days)
+
+ @classmethod
+ def _parse_time(cls, bytes_data: bytes, field_offset: int) -> datetime:
+ millis = struct.unpack('<i', bytes_data[field_offset:field_offset +
4])[0]
+ seconds = millis // 1000
+ microseconds = (millis % 1000) * 1000
+ return datetime(1970, 1, 1).replace(
+ hour=seconds // 3600,
+ minute=(seconds % 3600) // 60,
+ second=seconds % 60,
+ microsecond=microseconds
+ )
+
+
+class BinaryRowSerializer:
+ HEADER_SIZE_IN_BITS = 8
+ MAX_FIX_PART_DATA_SIZE = 7
+ HIGHEST_FIRST_BIT = 0x80 << 56
+ HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7F << 56
+
+ @classmethod
+ def to_bytes(cls, binary_row: BinaryRow) -> bytes:
+ if not binary_row.values:
+ return b''
+
+ arity = len(binary_row.fields)
+ null_bits_size_in_bytes = cls._calculate_bit_set_width_in_bytes(arity)
+ fixed_part_size = null_bits_size_in_bytes + arity * 8
+ fixed_part = bytearray(fixed_part_size)
+ fixed_part[0] = binary_row.row_kind.value
+
+ for i, value in enumerate(binary_row.values):
+ if value is None:
+ cls._set_null_bit(fixed_part, 0, i)
+
+ variable_data = []
+ variable_offsets = []
+ current_offset = 0
+
+ for i, (value, field) in enumerate(zip(binary_row.values,
binary_row.fields)):
+ if value is None:
+ struct.pack_into('<q', fixed_part, null_bits_size_in_bytes + i
* 8, 0)
+ variable_data.append(b'')
+ variable_offsets.append(0)
+ continue
+
+ field_offset = null_bits_size_in_bytes + i * 8
+ if not isinstance(field.type, AtomicType):
+ raise ValueError(f"BinaryRow only support AtomicType yet, meet
{field.type.__class__}")
+ if field.type.type.upper() in ['VARCHAR', 'STRING', 'CHAR',
'BINARY', 'VARBINARY', 'BYTES']:
+ if field.type.type.upper() in ['VARCHAR', 'STRING', 'CHAR']:
+ if isinstance(value, str):
+ value_bytes = value.encode('utf-8')
+ else:
+ value_bytes = bytes(value)
+ else:
+ if isinstance(value, bytes):
+ value_bytes = value
+ else:
+ value_bytes = bytes(value)
+
+ length = len(value_bytes)
+ if length <= cls.MAX_FIX_PART_DATA_SIZE:
+ fixed_part[field_offset:field_offset + length] =
value_bytes
+ for j in range(length, 8):
+ fixed_part[field_offset + j] = 0
+ packed_long = struct.unpack_from('<q', fixed_part,
field_offset)[0]
+
+ offset_and_len = packed_long | (length << 56) |
cls.HIGHEST_FIRST_BIT
+ if offset_and_len > 0x7FFFFFFFFFFFFFFF:
+ offset_and_len = offset_and_len - 0x10000000000000000
+ struct.pack_into('<q', fixed_part, field_offset,
offset_and_len)
+ variable_data.append(b'')
+ variable_offsets.append(0)
+ else:
+ variable_data.append(value_bytes)
+ variable_offsets.append(current_offset)
+ current_offset += len(value_bytes)
+ offset_and_len = (variable_offsets[i] << 32) |
len(variable_data[i])
+ struct.pack_into('<q', fixed_part, null_bits_size_in_bytes
+ i * 8, offset_and_len)
+ else:
+ if field.type.type.upper() in ['BOOLEAN', 'BOOL']:
+ struct.pack_into('<b', fixed_part, field_offset, 1 if
value else 0)
+ elif field.type.type.upper() in ['TINYINT', 'BYTE']:
+ struct.pack_into('<b', fixed_part, field_offset, value)
+ elif field.type.type.upper() in ['SMALLINT', 'SHORT']:
+ struct.pack_into('<h', fixed_part, field_offset, value)
+ elif field.type.type.upper() in ['INT', 'INTEGER']:
+ struct.pack_into('<i', fixed_part, field_offset, value)
+ elif field.type.type.upper() in ['BIGINT', 'LONG']:
+ struct.pack_into('<q', fixed_part, field_offset, value)
+ elif field.type.type.upper() in ['FLOAT', 'REAL']:
+ struct.pack_into('<f', fixed_part, field_offset, value)
+ elif field.type.type.upper() in ['DOUBLE']:
+ struct.pack_into('<d', fixed_part, field_offset, value)
+ else:
+ field_bytes = cls._serialize_field_value(value, field.type)
+ fixed_part[field_offset:field_offset + len(field_bytes)] =
field_bytes
+
+ variable_data.append(b'')
+ variable_offsets.append(0)
+
+ result = bytes(fixed_part) + b''.join(variable_data)
+ return result
+
+ @classmethod
+ def _calculate_bit_set_width_in_bytes(cls, arity: int) -> int:
+ return ((arity + 63 + cls.HEADER_SIZE_IN_BITS) // 64) * 8
+
+ @classmethod
+ def _set_null_bit(cls, bytes_data: bytearray, offset: int, pos: int) ->
None:
+ index = pos + cls.HEADER_SIZE_IN_BITS
+ byte_index = offset + (index // 8)
+ bit_index = index % 8
+ bytes_data[byte_index] |= (1 << bit_index)
+
+ @classmethod
+ def _serialize_field_value(cls, value: Any, data_type: AtomicType) ->
bytes:
+ type_name = data_type.type.upper()
+
+ if type_name in ['BOOLEAN', 'BOOL']:
+ return cls._serialize_boolean(value)
+ elif type_name in ['TINYINT', 'BYTE']:
+ return cls._serialize_byte(value)
+ elif type_name in ['SMALLINT', 'SHORT']:
+ return cls._serialize_short(value)
+ elif type_name in ['INT', 'INTEGER']:
+ return cls._serialize_int(value)
+ elif type_name in ['BIGINT', 'LONG']:
+ return cls._serialize_long(value)
+ elif type_name in ['FLOAT', 'REAL']:
+ return cls._serialize_float(value)
+ elif type_name in ['DOUBLE']:
+ return cls._serialize_double(value)
+ elif type_name in ['VARCHAR', 'STRING', 'CHAR']:
+ return cls._serialize_string(value)
+ elif type_name in ['BINARY', 'VARBINARY', 'BYTES']:
+ return cls._serialize_binary(value)
+ elif type_name in ['DECIMAL', 'NUMERIC']:
+ return cls._serialize_decimal(value, data_type)
+ elif type_name in ['TIMESTAMP', 'TIMESTAMP_WITHOUT_TIME_ZONE']:
+ return cls._serialize_timestamp(value)
+ elif type_name in ['DATE']:
+ return cls._serialize_date(value)
+ elif type_name in ['TIME', 'TIME_WITHOUT_TIME_ZONE']:
+ return cls._serialize_time(value)
+ else:
+ return cls._serialize_string(str(value))
+
+ @classmethod
+ def _serialize_boolean(cls, value: bool) -> bytes:
+ return struct.pack('<b', 1 if value else 0)
+
+ @classmethod
+ def _serialize_byte(cls, value: int) -> bytes:
+ return struct.pack('<b', value)
+
+ @classmethod
+ def _serialize_short(cls, value: int) -> bytes:
+ return struct.pack('<h', value)
+
+ @classmethod
+ def _serialize_int(cls, value: int) -> bytes:
+ return struct.pack('<i', value)
+
+ @classmethod
+ def _serialize_long(cls, value: int) -> bytes:
+ return struct.pack('<q', value)
+
+ @classmethod
+ def _serialize_float(cls, value: float) -> bytes:
+ return struct.pack('<f', value)
+
+ @classmethod
+ def _serialize_double(cls, value: float) -> bytes:
+ return struct.pack('<d', value)
+
+ @classmethod
+ def _serialize_string(cls, value) -> bytes:
+ if isinstance(value, str):
+ value_bytes = value.encode('utf-8')
+ else:
+ value_bytes = bytes(value)
+
+ length = len(value_bytes)
+
+ offset_and_len = (0x80 << 56) | (length << 56)
+ if offset_and_len > 0x7FFFFFFFFFFFFFFF:
+ offset_and_len = offset_and_len - 0x10000000000000000
+ return struct.pack('<q', offset_and_len)
+
+ @classmethod
+ def _serialize_binary(cls, value: bytes) -> bytes:
+ if isinstance(value, bytes):
+ data_bytes = value
+ else:
+ data_bytes = bytes(value)
+ length = len(data_bytes)
+ offset_and_len = (0x80 << 56) | (length << 56)
+ if offset_and_len > 0x7FFFFFFFFFFFFFFF:
+ offset_and_len = offset_and_len - 0x10000000000000000
+ return struct.pack('<q', offset_and_len)
+
+ @classmethod
+ def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes:
+ type_str = str(data_type)
+ if '(' in type_str and ')' in type_str:
+ try:
+ precision_scale = type_str.split('(')[1].split(')')[0]
+ if ',' in precision_scale:
+ scale = int(precision_scale.split(',')[1])
+ else:
+ scale = 0
+ except:
+ scale = 0
+ else:
+ scale = 0
+
+ unscaled_value = int(value * (10 ** scale))
+ return struct.pack('<q', unscaled_value)
+
+ @classmethod
+ def _serialize_timestamp(cls, value: datetime) -> bytes:
+ if value.tzinfo is None:
+ value = value.replace(tzinfo=timezone.utc)
+ millis = int(value.timestamp() * 1000)
+ return struct.pack('<q', millis)
+
+ @classmethod
+ def _serialize_date(cls, value: datetime) -> bytes:
+ if isinstance(value, datetime):
+ epoch = datetime(1970, 1, 1)
+ days = (value - epoch).days
+ else:
+ epoch = datetime(1970, 1, 1)
+ days = (value - epoch).days
+ return struct.pack('<i', days)
+
+ @classmethod
+ def _serialize_time(cls, value: datetime) -> bytes:
+ if isinstance(value, datetime):
+ midnight = value.replace(hour=0, minute=0, second=0, microsecond=0)
+ millis = int((value - midnight).total_seconds() * 1000)
+ else:
+ millis = value.hour * 3600000 + value.minute * 60000 +
value.second * 1000 + value.microsecond // 1000
+ return struct.pack('<i', millis)
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/table/row/internal_row.py
similarity index 52%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/table/row/internal_row.py
index 9e7418966f..ca19ebcddf 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/table/row/internal_row.py
@@ -15,23 +15,46 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+
from abc import ABC, abstractmethod
-from pathlib import Path
+from typing import Any
+
+from pypaimon.table.row.row_kind import RowKind
-class FileIO(ABC):
+class InternalRow(ABC):
+ """
+ Base interface for an internal data structure representing data of RowType.
+ """
+
@abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
+ def get_field(self, pos: int) -> Any:
+ """
+ Returns the value at the given position.
+ """
@abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
+ def is_null_at(self, pos: int) -> bool:
+ """
+ Returns true if the element is null at the given position.
+ """
@abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
+ def get_row_kind(self) -> RowKind:
+ """
+ Returns the kind of change that this row describes in a changelog.
+ """
@abstractmethod
- def list_status(self, path: Path):
- """"""
+ def __len__(self) -> int:
+ """
+ Returns the number of fields in this row.
+ The number does not include RowKind. It is kept separately.
+ """
+
+ def __str__(self) -> str:
+ fields = []
+ for pos in range(self.__len__()):
+ value = self.get_field(pos)
+ fields.append(str(value))
+ return " ".join(fields)
diff --git a/paimon-python/pypaimon/table/row/key_value.py
b/paimon-python/pypaimon/table/row/key_value.py
new file mode 100644
index 0000000000..22647c4b6d
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/key_value.py
@@ -0,0 +1,57 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pypaimon.table.row.offset_row import OffsetRow
+from pypaimon.table.row.row_kind import RowKind
+
+
+class KeyValue:
+ """A key value, including user key, sequence number, value kind and
value."""
+
+ def __init__(self, key_arity: int, value_arity: int):
+ self.key_arity = key_arity
+ self.value_arity = value_arity
+
+ self._row_tuple = None
+ self._reused_key = OffsetRow(None, 0, key_arity)
+ self._reused_value = OffsetRow(None, key_arity + 2, value_arity)
+
+ def replace(self, row_tuple: tuple):
+ self._row_tuple = row_tuple
+ self._reused_key.replace(row_tuple)
+ self._reused_value.replace(row_tuple)
+ return self
+
+ def is_add(self) -> bool:
+ return RowKind.is_add_byte(self.value_row_kind_byte)
+
+ @property
+ def key(self) -> OffsetRow:
+ return self._reused_key
+
+ @property
+ def value(self) -> OffsetRow:
+ return self._reused_value
+
+ @property
+ def sequence_number(self) -> int:
+ return self._row_tuple[self.key_arity]
+
+ @property
+ def value_row_kind_byte(self) -> int:
+ return self._row_tuple[self.key_arity + 1]
diff --git a/paimon-python/pypaimon/table/row/offset_row.py
b/paimon-python/pypaimon/table/row/offset_row.py
new file mode 100644
index 0000000000..dc3f37954a
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/offset_row.py
@@ -0,0 +1,58 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Optional
+
+from pypaimon.table.row.internal_row import InternalRow
+from pypaimon.table.row.row_kind import RowKind
+
+
+class OffsetRow(InternalRow):
+ """A InternalRow to wrap row with offset."""
+
+ def __init__(self, row_tuple: Optional[tuple], offset: int, arity: int):
+ self.row_tuple = row_tuple
+ self.offset = offset
+ self.arity = arity
+ self.row_kind_byte: int = 1
+
+ def replace(self, row_tuple: tuple) -> 'OffsetRow':
+ self.row_tuple = row_tuple
+ if self.offset + self.arity > len(row_tuple):
+ raise ValueError(f"Offset {self.offset} plus arity {self.arity} is
out of row length {len(row_tuple)}")
+ return self
+
+ def get_field(self, pos: int):
+ if pos >= self.arity:
+ raise IndexError(f"Position {pos} is out of bounds for row arity
{self.arity}")
+ return self.row_tuple[self.offset + pos]
+
+ def is_null_at(self, pos: int) -> bool:
+ return self.get_field(pos) is None
+
+ def get_row_kind(self) -> RowKind:
+ return RowKind(self.row_kind_byte)
+
+ def set_row_kind_byte(self, row_kind_byte: int) -> None:
+ """
+ Store RowKind as a byte and instantiate it lazily to avoid performance
overhead.
+ """
+ self.row_kind_byte = row_kind_byte
+
+ def __len__(self) -> int:
+ return self.arity
diff --git a/paimon-python/pypaimon/table/row/row_kind.py
b/paimon-python/pypaimon/table/row/row_kind.py
new file mode 100644
index 0000000000..06a2904fb3
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/row_kind.py
@@ -0,0 +1,61 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from enum import Enum
+
+
+class RowKind(Enum):
+ INSERT = 0 # +I: Update operation with the previous content of the
updated row.
+ UPDATE_BEFORE = 1 # -U: Update operation with the previous content of the
updated row
+ UPDATE_AFTER = 2 # +U: Update operation with new content of the updated
row
+ DELETE = 3 # -D: Deletion operation
+
+ def is_add(self) -> bool:
+ return self in (RowKind.INSERT, RowKind.UPDATE_AFTER)
+
+ def to_string(self) -> str:
+ if self == RowKind.INSERT:
+ return "+I"
+ elif self == RowKind.UPDATE_BEFORE:
+ return "-U"
+ elif self == RowKind.UPDATE_AFTER:
+ return "+U"
+ elif self == RowKind.DELETE:
+ return "-D"
+ else:
+ return "??"
+
+ @staticmethod
+ def from_string(kind_str: str) -> 'RowKind':
+ if kind_str == "+I":
+ return RowKind.INSERT
+ elif kind_str == "-U":
+ return RowKind.UPDATE_BEFORE
+ elif kind_str == "+U":
+ return RowKind.UPDATE_AFTER
+ elif kind_str == "-D":
+ return RowKind.DELETE
+ else:
+ raise ValueError(f"Unknown row kind string: {kind_str}")
+
+ @classmethod
+ def is_add_byte(cls, byte: int):
+ """
+ Check RowKind type from byte, to avoid creation and destruction of
RowKind objects, reducing GC pressure
+ """
+ return byte == 0 or byte == 2
diff --git a/paimon-python/pypaimon/tests/api_test.py
b/paimon-python/pypaimon/tests/api_test.py
index 8012a5a364..9a604dad60 100644
--- a/paimon-python/pypaimon/tests/api_test.py
+++ b/paimon-python/pypaimon/tests/api_test.py
@@ -26,11 +26,11 @@ from ..api import RESTApi
from ..api.auth import BearTokenAuthProvider
from ..api.identifier import Identifier
from ..api.options import Options
-from ..api.rest_json import JSON
+from pypaimon.common.rest_json import JSON
from pypaimon.schema.table_schema import TableSchema
from ..api.token_loader import DLFTokenLoaderFactory, DLFToken
-from ..api.data_types import AtomicInteger, DataTypeParser, AtomicType,
ArrayType, MapType, RowType, DataField
+from pypaimon.schema.data_types import AtomicInteger, DataTypeParser,
AtomicType, ArrayType, MapType, RowType, DataField
from ..catalog.catalog_context import CatalogContext
from ..catalog.table_metadata import TableMetadata
from ..rest.rest_catalog import RESTCatalog
@@ -261,8 +261,8 @@ class ApiTestCase(unittest.TestCase):
server.start()
ecs_metadata_url =
f"http://localhost:{server.port}/ram/security-credential/"
options = {
- api.RESTCatalogOptions.DLF_TOKEN_LOADER: 'ecs',
- api.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL:
ecs_metadata_url
+ api.CatalogOptions.DLF_TOKEN_LOADER: 'ecs',
+ api.CatalogOptions.DLF_TOKEN_ECS_METADATA_URL: ecs_metadata_url
}
loader = DLFTokenLoaderFactory.create_token_loader(options)
load_token = loader.load_token()
@@ -271,9 +271,9 @@ class ApiTestCase(unittest.TestCase):
self.assertEqual(load_token.security_token, token.security_token)
self.assertEqual(load_token.expiration, token.expiration)
options_with_role = {
- api.RESTCatalogOptions.DLF_TOKEN_LOADER: 'ecs',
- api.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL:
ecs_metadata_url,
- api.RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME: role_name,
+ api.CatalogOptions.DLF_TOKEN_LOADER: 'ecs',
+ api.CatalogOptions.DLF_TOKEN_ECS_METADATA_URL:
ecs_metadata_url,
+ api.CatalogOptions.DLF_TOKEN_ECS_ROLE_NAME: role_name,
}
loader =
DLFTokenLoaderFactory.create_token_loader(options_with_role)
token = loader.load_token()
diff --git a/paimon-python/pypaimon/tests/pvfs_test.py
b/paimon-python/pypaimon/tests/pvfs_test.py
index c20f337302..b81914a347 100644
--- a/paimon-python/pypaimon/tests/pvfs_test.py
+++ b/paimon-python/pypaimon/tests/pvfs_test.py
@@ -24,7 +24,7 @@ from pathlib import Path
from pypaimon.api import ConfigResponse
from pypaimon.api.auth import BearTokenAuthProvider
-from pypaimon.api.data_types import DataField, AtomicType
+from pypaimon.schema.data_types import DataField, AtomicType
from pypaimon.schema.table_schema import TableSchema
from pypaimon.catalog.table_metadata import TableMetadata
from pypaimon.pvfs import PaimonVirtualFileSystem
diff --git a/paimon-python/pypaimon/tests/rest_server.py
b/paimon-python/pypaimon/tests/rest_server.py
index 27ba27907c..44be0e9e42 100644
--- a/paimon-python/pypaimon/tests/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest_server.py
@@ -32,8 +32,10 @@ from ..api import RenameTableRequest, CreateTableRequest,
CreateDatabaseRequest,
from ..api.api_response import (ConfigResponse, ListDatabasesResponse,
GetDatabaseResponse,
Schema, GetTableResponse, ListTablesResponse,
RESTResponse, PagedList)
-from ..api.rest_json import JSON
+from pypaimon.common.rest_json import JSON
from pypaimon.schema.table_schema import TableSchema
+from ..catalog.catalog_exception import DatabaseNoPermissionException,
TableNotExistException, \
+ DatabaseNotExistException, TableNoPermissionException
from ..catalog.table_metadata import TableMetadata
@@ -57,143 +59,6 @@ class ErrorResponse(RESTResponse):
code: int
-# Exception classes
-class CatalogException(Exception):
- """Base catalog exception"""
-
-
-class DatabaseNotExistException(CatalogException):
- """Database not exist exception"""
-
- def __init__(self, database: str):
- self.database = database
- super().__init__(f"Database {database} does not exist")
-
-
-class DatabaseAlreadyExistException(CatalogException):
- """Database already exist exception"""
-
- def __init__(self, database: str):
- self.database = database
- super().__init__(f"Database {database} already exists")
-
-
-class DatabaseNoPermissionException(CatalogException):
- """Database no permission exception"""
-
- def __init__(self, database: str):
- self.database = database
- super().__init__(f"No permission to access database {database}")
-
-
-class TableNotExistException(CatalogException):
- """Table not exist exception"""
-
- def __init__(self, identifier: Identifier):
- self.identifier = identifier
- super().__init__(f"Table {identifier.get_full_name()} does not exist")
-
-
-class TableAlreadyExistException(CatalogException):
- """Table already exist exception"""
-
- def __init__(self, identifier: Identifier):
- self.identifier = identifier
- super().__init__(f"Table {identifier.get_full_name()} already exists")
-
-
-class TableNoPermissionException(CatalogException):
- """Table no permission exception"""
-
- def __init__(self, identifier: Identifier):
- self.identifier = identifier
- super().__init__(f"No permission to access table
{identifier.get_full_name()}")
-
-
-class ViewNotExistException(CatalogException):
- """View not exist exception"""
-
- def __init__(self, identifier: Identifier):
- self.identifier = identifier
- super().__init__(f"View {identifier.get_full_name()} does not exist")
-
-
-class ViewAlreadyExistException(CatalogException):
- """View already exist exception"""
-
- def __init__(self, identifier: Identifier):
- self.identifier = identifier
- super().__init__(f"View {identifier.get_full_name()} already exists")
-
-
-class FunctionNotExistException(CatalogException):
- """Function not exist exception"""
-
- def __init__(self, identifier: Identifier):
- self.identifier = identifier
- super().__init__(f"Function {identifier.get_full_name()} does not
exist")
-
-
-class FunctionAlreadyExistException(CatalogException):
- """Function already exist exception"""
-
- def __init__(self, identifier: Identifier):
- self.identifier = identifier
- super().__init__(f"Function {identifier.get_full_name()} already
exists")
-
-
-class ColumnNotExistException(CatalogException):
- """Column not exist exception"""
-
- def __init__(self, column: str):
- self.column = column
- super().__init__(f"Column {column} does not exist")
-
-
-class ColumnAlreadyExistException(CatalogException):
- """Column already exist exception"""
-
- def __init__(self, column: str):
- self.column = column
- super().__init__(f"Column {column} already exists")
-
-
-class DefinitionNotExistException(CatalogException):
- """Definition not exist exception"""
-
- def __init__(self, identifier: Identifier, name: str):
- self.identifier = identifier
- self.name = name
- super().__init__(f"Definition {name} does not exist in
{identifier.get_full_name()}")
-
-
-class DefinitionAlreadyExistException(CatalogException):
- """Definition already exist exception"""
-
- def __init__(self, identifier: Identifier, name: str):
- self.identifier = identifier
- self.name = name
- super().__init__(f"Definition {name} already exists in
{identifier.get_full_name()}")
-
-
-class DialectNotExistException(CatalogException):
- """Dialect not exist exception"""
-
- def __init__(self, identifier: Identifier, dialect: str):
- self.identifier = identifier
- self.dialect = dialect
- super().__init__(f"Dialect {dialect} does not exist in
{identifier.get_full_name()}")
-
-
-class DialectAlreadyExistException(CatalogException):
- """Dialect already exist exception"""
-
- def __init__(self, identifier: Identifier, dialect: str):
- self.identifier = identifier
- self.dialect = dialect
- super().__init__(f"Dialect {dialect} already exists in
{identifier.get_full_name()}")
-
-
# Constants
DEFAULT_MAX_RESULTS = 100
AUTHORIZATION_HEADER_KEY = "Authorization"