This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3abee4d3c7 [python] api:  support ecs role auth (#5913)
3abee4d3c7 is described below

commit 3abee4d3c753ce028f1ad94c87656de2c7e8064e
Author: jerry <[email protected]>
AuthorDate: Mon Jul 21 13:42:53 2025 +0800

    [python] api:  support ecs role auth (#5913)
---
 paimon-python/pypaimon/api/__init__.py     |  67 ++++-----
 paimon-python/pypaimon/api/auth.py         |  60 ++++----
 paimon-python/pypaimon/api/client.py       |  24 ++--
 paimon-python/pypaimon/api/token_loader.py | 215 +++++++++++++++++++++++++++++
 paimon-python/pypaimon/api/typedef.py      |  26 +++-
 paimon-python/pypaimon/tests/api_test.py   |  72 ++++++++--
 6 files changed, 373 insertions(+), 91 deletions(-)

diff --git a/paimon-python/pypaimon/api/__init__.py 
b/paimon-python/pypaimon/api/__init__.py
index 9920eef4bb..6bafc45d98 100644
--- a/paimon-python/pypaimon/api/__init__.py
+++ b/paimon-python/pypaimon/api/__init__.py
@@ -18,34 +18,17 @@
 import logging
 from typing import Dict, List, Optional, Callable
 from urllib.parse import unquote
-from .auth import RESTAuthFunction
-from .api_response import (
-    PagedList,
-    GetTableResponse,
-    ListDatabasesResponse,
-    ListTablesResponse,
-    GetDatabaseResponse,
-    ConfigResponse,
-    PagedResponse,
-)
+
+from .api_response import PagedList, GetTableResponse, ListDatabasesResponse, 
ListTablesResponse, \
+    GetDatabaseResponse, ConfigResponse, PagedResponse
 from .api_resquest import CreateDatabaseRequest, AlterDatabaseRequest
-from .typedef import Identifier
+from .typedef import Identifier, RESTCatalogOptions
 from .client import HttpClient
-from .auth import DLFAuthProvider, DLFToken
+from .auth import DLFAuthProvider, RESTAuthFunction
+from .token_loader import DLFToken, DLFTokenLoaderFactory
 from .typedef import T
 
 
-class RESTCatalogOptions:
-    URI = "uri"
-    WAREHOUSE = "warehouse"
-    TOKEN_PROVIDER = "token.provider"
-    DLF_REGION = "dlf.region"
-    DLF_ACCESS_KEY_ID = "dlf.access-key-id"
-    DLF_ACCESS_KEY_SECRET = "dlf.access-key-secret"
-    DLF_ACCESS_SECURITY_TOKEN = "dlf.security-token"
-    PREFIX = "prefix"
-
-
 class RESTException(Exception):
     pass
 
@@ -133,7 +116,9 @@ class RESTApi:
         self.logger = logging.getLogger(self.__class__.__name__)
         self.client = HttpClient(options.get(RESTCatalogOptions.URI))
         auth_provider = DLFAuthProvider(
-            DLFToken(options), options.get(RESTCatalogOptions.DLF_REGION)
+            options.get(RESTCatalogOptions.DLF_REGION),
+            DLFToken.from_options(options),
+            DLFTokenLoaderFactory.create_token_loader(options)
         )
         base_headers = RESTUtil.extract_prefix_map(options, self.HEADER_PREFIX)
 
@@ -160,9 +145,9 @@ class RESTApi:
         self.resource_paths = ResourcePaths.for_catalog_properties(options)
 
     def __build_paged_query_params(
-        max_results: Optional[int],
-        page_token: Optional[str],
-        name_patterns: Dict[str, str],
+            max_results: Optional[int],
+            page_token: Optional[str],
+            name_patterns: Dict[str, str],
     ) -> Dict[str, str]:
         query_params = {}
         if max_results is not None and max_results > 0:
@@ -178,7 +163,7 @@ class RESTApi:
         return query_params
 
     def __list_data_from_page_api(
-        self, page_api: Callable[[Dict[str, str]], PagedResponse[T]]
+            self, page_api: Callable[[Dict[str, str]], PagedResponse[T]]
     ) -> List[T]:
         results = []
         query_params = {}
@@ -216,10 +201,10 @@ class RESTApi:
         )
 
     def list_databases_paged(
-        self,
-        max_results: Optional[int] = None,
-        page_token: Optional[str] = None,
-        database_name_pattern: Optional[str] = None,
+            self,
+            max_results: Optional[int] = None,
+            page_token: Optional[str] = None,
+            database_name_pattern: Optional[str] = None,
     ) -> PagedList[str]:
 
         response = self.client.get_with_params(
@@ -255,10 +240,10 @@ class RESTApi:
             self.rest_auth_function)
 
     def alter_database(
-        self,
-        name: str,
-        removals: Optional[List[str]] = None,
-        updates: Optional[Dict[str, str]] = None,
+            self,
+            name: str,
+            removals: Optional[List[str]] = None,
+            updates: Optional[Dict[str, str]] = None,
     ):
         if not name or not name.strip():
             raise ValueError("Database name cannot be empty")
@@ -282,11 +267,11 @@ class RESTApi:
         )
 
     def list_tables_paged(
-        self,
-        database_name: str,
-        max_results: Optional[int] = None,
-        page_token: Optional[str] = None,
-        table_name_pattern: Optional[str] = None,
+            self,
+            database_name: str,
+            max_results: Optional[int] = None,
+            page_token: Optional[str] = None,
+            table_name_pattern: Optional[str] = None,
     ) -> PagedList[str]:
         response = self.client.get_with_params(
             self.resource_paths.tables(database_name),
diff --git a/paimon-python/pypaimon/api/auth.py 
b/paimon-python/pypaimon/api/auth.py
index 83e0e44c91..b30c1ec025 100644
--- a/paimon-python/pypaimon/api/auth.py
+++ b/paimon-python/pypaimon/api/auth.py
@@ -19,36 +19,14 @@ import base64
 import hashlib
 import hmac
 import logging
-import threading
+import time
 from abc import ABC, abstractmethod
 from collections import OrderedDict
-from dataclasses import dataclass
 from datetime import datetime, timezone
 from typing import Optional, Dict
 
-
-@dataclass
-class RESTAuthParameter:
-    method: str
-    path: str
-    data: str
-    parameters: Dict[str, str]
-
-
-@dataclass
-class DLFToken:
-    access_key_id: str
-    access_key_secret: str
-    security_token: Optional[str] = None
-
-    def __init__(self, options: Dict[str, str]):
-        from . import RESTCatalogOptions
-
-        self.access_key_id = options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID)
-        self.access_key_secret = options.get(
-            RESTCatalogOptions.DLF_ACCESS_KEY_SECRET)
-        self.security_token = options.get(
-            RESTCatalogOptions.DLF_ACCESS_SECURITY_TOKEN)
+from .token_loader import DLFTokenLoader, DLFToken
+from .typedef import RESTAuthParameter
 
 
 class AuthProvider(ABC):
@@ -92,11 +70,33 @@ class DLFAuthProvider(AuthProvider):
     AUTH_DATE_TIME_FORMAT = "%Y%m%dT%H%M%SZ"
     MEDIA_TYPE = "application/json"
 
-    def __init__(self, token: DLFToken, region: str):
+    TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000
+
+    token: DLFToken
+    token_loader: DLFTokenLoader
+    region: str
+
+    def __init__(self,
+                 region: str,
+                 token: DLFToken = None,
+                 token_loader: DLFTokenLoader = None):
         self.logger = logging.getLogger(self.__class__.__name__)
+        if token is None and token_loader is None:
+            raise ValueError("Either token or token_loader must be provided")
         self.token = token
+        self.token_loader = token_loader
         self.region = region
-        self._lock = threading.Lock()
+
+    def get_token(self):
+        if self.token_loader is not None:
+            if self.token is None:
+                self.token = self.token_loader.load_token()
+            elif self.token is not None and self.token.expiration_at_millis is 
not None:
+                if self.token.expiration_at_millis - int(time.time() * 1000) < 
self.TOKEN_EXPIRATION_SAFE_TIME_MILLIS:
+                    self.token = self.token_loader.load_token()
+        if self.token is None:
+            raise ValueError("Either token or token_loader must be provided")
+        return self.token
 
     def merge_auth_header(
         self, base_header: Dict[str, str], rest_auth_parameter: 
RESTAuthParameter
@@ -109,12 +109,14 @@ class DLFAuthProvider(AuthProvider):
             date = date_time[:8]
 
             sign_headers = self.generate_sign_headers(
-                rest_auth_parameter.data, date_time, self.token.security_token
+                rest_auth_parameter.data,
+                date_time,
+                self.get_token().security_token
             )
 
             authorization = DLFAuthSignature.get_authorization(
                 rest_auth_parameter=rest_auth_parameter,
-                dlf_token=self.token,
+                dlf_token=self.get_token(),
                 region=self.region,
                 headers=sign_headers,
                 date_time=date_time,
diff --git a/paimon-python/pypaimon/api/client.py 
b/paimon-python/pypaimon/api/client.py
index a7c69cb31d..d83f79a202 100644
--- a/paimon-python/pypaimon/api/client.py
+++ b/paimon-python/pypaimon/api/client.py
@@ -27,7 +27,7 @@ import requests
 from requests.adapters import HTTPAdapter
 from urllib3 import Retry
 
-from .auth import RESTAuthParameter
+from .typedef import RESTAuthParameter
 from .api_response import ErrorResponse
 from .rest_json import JSON
 
@@ -228,17 +228,20 @@ class DefaultErrorHandler(ErrorHandler):
         raise RESTException("Unable to process: %s", message)
 
 
-class ExponentialRetryInterceptor:
+class ExponentialRetry:
+
+    adapter: HTTPAdapter
 
     def __init__(self, max_retries: int = 5):
-        self.max_retries = max_retries
-        self.logger = logging.getLogger(self.__class__.__name__)
+        retry = self.__create_retry_strategy(max_retries)
+        self.adapter = HTTPAdapter(max_retries=retry)
 
-    def create_retry_strategy(self) -> Retry:
+    @staticmethod
+    def __create_retry_strategy(max_retries: int) -> Retry:
         retry_kwargs = {
-            'total': self.max_retries,
-            'read': self.max_retries,
-            'connect': self.max_retries,
+            'total': max_retries,
+            'read': max_retries,
+            'connect': max_retries,
             'backoff_factor': 1,
             'status_forcelist': [429, 502, 503, 504],
             'raise_on_status': False,
@@ -365,9 +368,8 @@ class HttpClient(RESTClient):
 
         self.session = requests.Session()
 
-        retry_interceptor = ExponentialRetryInterceptor(max_retries=1)
-        retry_strategy = retry_interceptor.create_retry_strategy()
-        adapter = HTTPAdapter(max_retries=retry_strategy)
+        retry_interceptor = ExponentialRetry(max_retries=3)
+        adapter = HTTPAdapter(max_retries=retry_interceptor.adapter)
 
         self.session.mount("http://";, adapter)
         self.session.mount("https://";, adapter)
diff --git a/paimon-python/pypaimon/api/token_loader.py 
b/paimon-python/pypaimon/api/token_loader.py
new file mode 100644
index 0000000000..46810a1121
--- /dev/null
+++ b/paimon-python/pypaimon/api/token_loader.py
@@ -0,0 +1,215 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from abc import ABC, abstractmethod
+from datetime import datetime, timezone
+from dataclasses import dataclass
+from typing import Optional, Dict
+from urllib.parse import urljoin
+import requests
+from requests.adapters import HTTPAdapter
+from requests.exceptions import RequestException
+
+from .rest_json import json_field, JSON
+from .typedef import RESTCatalogOptions
+from .client import ExponentialRetry
+
+
+@dataclass
+class DLFToken:
+    TOKEN_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
+
+    access_key_id: str = json_field('AccessKeyId')
+    access_key_secret: str = json_field('AccessKeySecret')
+    security_token: Optional[str] = json_field('SecurityToken')
+    expiration: Optional[str] = json_field('Expiration')
+    expiration_at_millis: Optional[int] = json_field('ExpirationAt', 
default=None)
+
+    @staticmethod
+    def parse_expiration_to_millis(expiration: str) -> int:
+        date_time = datetime.strptime(expiration, DLFToken.TOKEN_DATE_FORMAT)
+        utc_datetime = date_time.replace(tzinfo=timezone.utc)
+        expiration_at_millis = int(utc_datetime.timestamp() * 1000)
+        return expiration_at_millis
+
+    def __init__(self, access_key_id: str, access_key_secret: str,
+                 security_token: str, expiration: str = None, 
expiration_at_millis: int = None):
+        self.access_key_id = access_key_id
+        self.access_key_secret = access_key_secret
+        self.security_token = security_token
+        self.expiration = expiration
+        if expiration_at_millis is not None:
+            self.expiration_at_millis = expiration_at_millis
+        if expiration is not None:
+            self.expiration_at_millis = 
self.parse_expiration_to_millis(expiration)
+
+    @classmethod
+    def from_options(cls, options: Dict[str, str]) -> Optional['DLFToken']:
+        from .typedef import RESTCatalogOptions
+        if (options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID) is None
+                or options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET) is 
None):
+            return None
+        else:
+            return cls(
+                
access_key_id=options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID),
+                
access_key_secret=options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET),
+                
security_token=options.get(RESTCatalogOptions.DLF_ACCESS_SECURITY_TOKEN)
+            )
+
+
+class DLFTokenLoader(ABC):
+
+    @abstractmethod
+    def load_token(self) -> DLFToken:
+        pass
+
+    @abstractmethod
+    def description(self) -> str:
+        pass
+
+
+class HTTPClient:
+    """HTTP client with retry and timeout configuration"""
+
+    def __init__(self, connect_timeout: int = 180, read_timeout: int = 180,
+                 max_retries: int = 3):
+        self.connect_timeout = connect_timeout
+        self.read_timeout = read_timeout
+        self.session = requests.Session()
+
+        # Add retry adapter
+        retry_interceptor = ExponentialRetry(max_retries=3)
+        adapter = HTTPAdapter(max_retries=retry_interceptor.adapter)
+
+        self.session.mount("http://";, adapter)
+        self.session.mount("https://";, adapter)
+
+        # Set timeouts
+        self.session.timeout = (connect_timeout, read_timeout)
+
+    def get(self, url: str, **kwargs) -> requests.Response:
+        """Make GET request with configured timeouts and retries"""
+        return self.session.get(url, timeout=(self.connect_timeout, 
self.read_timeout), **kwargs)
+
+    def close(self):
+        """Close the session"""
+        self.session.close()
+
+
+class DLFECSTokenLoader(DLFTokenLoader):
+    """
+    DLF ECS Token Loader implementation
+
+    This class loads DLF tokens from ECS metadata service.
+    """
+
+    # Class-level HTTP client (equivalent to static in Java)
+    _http_client: Optional[HTTPClient] = None
+
+    @classmethod
+    def get_http_client(cls) -> HTTPClient:
+        """Get or create the shared HTTP client"""
+        if cls._http_client is None:
+            cls._http_client = HTTPClient(
+                connect_timeout=180,  # 3 minutes
+                read_timeout=180,  # 3 minutes
+                max_retries=3
+            )
+        return cls._http_client
+
+    def __init__(self, ecs_metadata_url: str, role_name: Optional[str] = None):
+        """
+        Initialize DLF ECS Token Loader
+
+        Args:
+            ecs_metadata_url: ECS metadata service URL
+            role_name: Optional role name. If None, will be fetched from 
metadata service
+        """
+        self.ecs_metadata_url = ecs_metadata_url
+        self.role_name = role_name
+
+    def load_token(self) -> DLFToken:
+        try:
+            if self.role_name is None:
+                self.role_name = self._get_role(self.ecs_metadata_url)
+
+            token_url = urljoin(self.ecs_metadata_url.rstrip('/') + '/', 
self.role_name)
+            return self._get_token(token_url)
+
+        except Exception as e:
+            raise RuntimeError(f"Token loading failed: {e}") from e
+
+    def description(self) -> str:
+        return self.ecs_metadata_url
+
+    def _get_role(self, url: str) -> str:
+        try:
+            return self._get_response_body(url)
+        except Exception as e:
+            raise RuntimeError(f"Get role failed, error: {e}") from e
+
+    def _get_token(self, url: str) -> DLFToken:
+        try:
+            token_json = self._get_response_body(url)
+            return JSON.from_json(token_json, DLFToken)
+        except OSError as e:
+            # Python equivalent of UncheckedIOException
+            raise OSError(f"IO error while getting token: {e}") from e
+        except Exception as e:
+            raise RuntimeError(f"Get token failed, error: {e}") from e
+
+    def _get_response_body(self, url: str) -> str:
+        try:
+            http_client = self.get_http_client()
+            response = http_client.get(url)
+
+            if response is None:
+                raise RuntimeError("Get response failed, response is None")
+
+            if not response.ok:
+                raise RuntimeError(f"Get response failed, response: 
{response.status_code} {response.reason}")
+
+            response_body = response.text
+            if response_body is None:
+                raise RuntimeError("Get response failed, response body is 
None")
+            return response_body
+
+        except RuntimeError:
+            # Re-raise RuntimeError as-is
+            raise
+        except RequestException as e:
+            raise RuntimeError(f"Request failed: {e}") from e
+        except Exception as e:
+            raise RuntimeError(f"Get response failed, error: {e}") from e
+
+
+# Factory and utility functions
+class DLFTokenLoaderFactory:
+    """Factory for creating DLF token loaders"""
+
+    @staticmethod
+    def create_token_loader(options: Dict[str, str]) -> 
Optional['DLFTokenLoader']:
+        """Create ECS token loader"""
+        loader = options.get(RESTCatalogOptions.DLF_TOKEN_LOADER)
+        if loader == 'ecs':
+            ecs_metadata_url = options.get(
+                RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL,
+                
'http://100.100.100.200/latest/meta-data/Ram/security-credentials/'
+            )
+            role_name = options.get(RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME)
+            return DLFECSTokenLoader(ecs_metadata_url, role_name)
+        return None
diff --git a/paimon-python/pypaimon/api/typedef.py 
b/paimon-python/pypaimon/api/typedef.py
index 9c79e48323..4cf65738f9 100644
--- a/paimon-python/pypaimon/api/typedef.py
+++ b/paimon-python/pypaimon/api/typedef.py
@@ -16,7 +16,7 @@
 #  under the License.
 
 from dataclasses import dataclass
-from typing import Optional, TypeVar
+from typing import Optional, TypeVar, Dict
 
 T = TypeVar("T")
 
@@ -61,4 +61,26 @@ class Identifier:
         return self.branch_name
 
     def is_system_table(self) -> bool:
-        return self.object_name.startswith("$")
+        return self.object_name.startswith('$')
+
+
+@dataclass
+class RESTAuthParameter:
+    method: str
+    path: str
+    data: str
+    parameters: Dict[str, str]
+
+
+class RESTCatalogOptions:
+    URI = "uri"
+    WAREHOUSE = "warehouse"
+    TOKEN_PROVIDER = "token.provider"
+    DLF_REGION = "dlf.region"
+    DLF_ACCESS_KEY_ID = "dlf.access-key-id"
+    DLF_ACCESS_KEY_SECRET = "dlf.access-key-secret"
+    DLF_ACCESS_SECURITY_TOKEN = "dlf.security-token"
+    DLF_TOKEN_LOADER = "dlf.token-loader"
+    DLF_TOKEN_ECS_ROLE_NAME = "dlf.token-ecs-role-name"
+    DLF_TOKEN_ECS_METADATA_URL = "dlf.token-ecs-metadata-url"
+    PREFIX = 'prefix'
diff --git a/paimon-python/pypaimon/tests/api_test.py 
b/paimon-python/pypaimon/tests/api_test.py
index 0150c4919e..d8b872c33f 100644
--- a/paimon-python/pypaimon/tests/api_test.py
+++ b/paimon-python/pypaimon/tests/api_test.py
@@ -26,10 +26,12 @@ from urllib.parse import urlparse
 import unittest
 
 import pypaimon.api as api
-from ..api.api_response import (ConfigResponse, ListDatabasesResponse, 
GetDatabaseResponse, TableMetadata, Schema,
-                                GetTableResponse, ListTablesResponse, 
TableSchema, RESTResponse, PagedList, DataField)
+from ..api.api_response import (ConfigResponse, ListDatabasesResponse, 
GetDatabaseResponse,
+                                TableMetadata, Schema, GetTableResponse, 
ListTablesResponse,
+                                TableSchema, RESTResponse, PagedList, 
DataField)
 from ..api import RESTApi
 from ..api.rest_json import JSON
+from ..api.token_loader import DLFTokenLoaderFactory, DLFToken
 from ..api.typedef import Identifier
 from ..api.data_types import AtomicInteger, DataTypeParser, AtomicType, 
ArrayType, MapType, RowType
 
@@ -240,7 +242,8 @@ OBJECT_TABLE = "OBJECT_TABLE"
 class RESTCatalogServer:
     """Mock REST server for testing"""
 
-    def __init__(self, data_path: str, auth_provider, config: ConfigResponse, 
warehouse: str):
+    def __init__(self, data_path: str, auth_provider, config: ConfigResponse, 
warehouse: str,
+                 role_name: str = None, token_json: str = None):
         self.logger = logging.getLogger(__name__)
         self.warehouse = warehouse
         self.config_response = config
@@ -259,6 +262,8 @@ class RESTCatalogServer:
         # Initialize mock catalog (simplified)
         self.data_path = data_path
         self.auth_provider = auth_provider
+        self.role_name = role_name
+        self.token_json = token_json
 
         # HTTP server setup
         self.server = None
@@ -375,6 +380,13 @@ class RESTCatalogServer:
                 if warehouse_param == self.warehouse:
                     return self._mock_response(self.config_response, 200)
 
+            # ecs role
+            if resource_path == '/ram/security-credential/':
+                return self._mock_response(self.role_name, 200)
+
+            if resource_path == f'/ram/security-credential/{self.role_name}':
+                return self._mock_response(self.token_json, 200)
+
             # Databases endpoint
             if resource_path == self.database_uri or 
resource_path.startswith(self.database_uri + "?"):
                 return self._databases_api_handler(method, data, parameters)
@@ -462,7 +474,6 @@ class RESTCatalogServer:
         if len(path_parts) == 3:
             # Basic table operations
             return self._table_handle(method, data, identifier)
-
         return self._mock_response(ErrorResponse(None, None, "Not Found", 
404), 404)
 
     def _databases_api_handler(self, method: str, data: str,
@@ -861,7 +872,8 @@ class ApiTestCase(unittest.TestCase):
                 DataField(0, "name", AtomicType('INT'), 'desc  name'),
                 DataField(1, "arr11", ArrayType(True, AtomicType('INT')), 
'desc  arr11'),
                 DataField(2, "map11", MapType(False, AtomicType('INT'),
-                                              MapType(False, 
AtomicType('INT'), AtomicType('INT'))), 'desc  arr11'),
+                                              MapType(False, 
AtomicType('INT'), AtomicType('INT'))),
+                          'desc  arr11'),
             ]
             schema = TableSchema(len(data_fields), data_fields, 
len(data_fields), [], [], {}, "")
             test_tables = {
@@ -888,6 +900,50 @@ class ApiTestCase(unittest.TestCase):
             server.shutdown()
             print("Server stopped")
 
-
-if __name__ == "__main__":
-    unittest.main()
+    def test_ecs_loader_token(self):
+        token = DLFToken(
+            access_key_id='AccessKeyId',
+            access_key_secret='AccessKeySecret',
+            security_token='AQoDYXdzEJr...<remainder of security token>',
+            expiration="2023-12-01T12:00:00Z"
+        )
+        token_json = JSON.to_json(token)
+        role_name = 'test_role'
+        config = ConfigResponse(defaults={"prefix": "mock-test"})
+        server = RESTCatalogServer(
+            data_path="/tmp/test_warehouse",
+            auth_provider=None,
+            config=config,
+            warehouse="test_warehouse",
+            role_name=role_name,
+            token_json=token_json
+        )
+        try:
+            # Start server
+            server.start()
+            ecs_metadata_url = 
f"http://localhost:{server.port}/ram/security-credential/";
+            options = {
+                api.RESTCatalogOptions.DLF_TOKEN_LOADER: 'ecs',
+                api.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL: 
ecs_metadata_url
+            }
+            loader = DLFTokenLoaderFactory.create_token_loader(options)
+            load_token = loader.load_token()
+            self.assertEqual(load_token.access_key_id, token.access_key_id)
+            self.assertEqual(load_token.access_key_secret, 
token.access_key_secret)
+            self.assertEqual(load_token.security_token, token.security_token)
+            self.assertEqual(load_token.expiration, token.expiration)
+            options_with_role = {
+                api.RESTCatalogOptions.DLF_TOKEN_LOADER: 'ecs',
+                api.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL: 
ecs_metadata_url,
+                api.RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME: role_name,
+            }
+            loader = 
DLFTokenLoaderFactory.create_token_loader(options_with_role)
+            token = loader.load_token()
+            self.assertEqual(load_token.access_key_id, token.access_key_id)
+            self.assertEqual(load_token.access_key_secret, 
token.access_key_secret)
+            self.assertEqual(load_token.security_token, token.security_token)
+            self.assertEqual(load_token.expiration, token.expiration)
+        finally:
+            # Shutdown server
+            server.shutdown()
+            print("Server stopped")

Reply via email to