This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3abee4d3c7 [python] api: support ecs role auth (#5913)
3abee4d3c7 is described below
commit 3abee4d3c753ce028f1ad94c87656de2c7e8064e
Author: jerry <[email protected]>
AuthorDate: Mon Jul 21 13:42:53 2025 +0800
[python] api: support ecs role auth (#5913)
---
paimon-python/pypaimon/api/__init__.py | 67 ++++-----
paimon-python/pypaimon/api/auth.py | 60 ++++----
paimon-python/pypaimon/api/client.py | 24 ++--
paimon-python/pypaimon/api/token_loader.py | 215 +++++++++++++++++++++++++++++
paimon-python/pypaimon/api/typedef.py | 26 +++-
paimon-python/pypaimon/tests/api_test.py | 72 ++++++++--
6 files changed, 373 insertions(+), 91 deletions(-)
diff --git a/paimon-python/pypaimon/api/__init__.py
b/paimon-python/pypaimon/api/__init__.py
index 9920eef4bb..6bafc45d98 100644
--- a/paimon-python/pypaimon/api/__init__.py
+++ b/paimon-python/pypaimon/api/__init__.py
@@ -18,34 +18,17 @@
import logging
from typing import Dict, List, Optional, Callable
from urllib.parse import unquote
-from .auth import RESTAuthFunction
-from .api_response import (
- PagedList,
- GetTableResponse,
- ListDatabasesResponse,
- ListTablesResponse,
- GetDatabaseResponse,
- ConfigResponse,
- PagedResponse,
-)
+
+from .api_response import PagedList, GetTableResponse, ListDatabasesResponse,
ListTablesResponse, \
+ GetDatabaseResponse, ConfigResponse, PagedResponse
from .api_resquest import CreateDatabaseRequest, AlterDatabaseRequest
-from .typedef import Identifier
+from .typedef import Identifier, RESTCatalogOptions
from .client import HttpClient
-from .auth import DLFAuthProvider, DLFToken
+from .auth import DLFAuthProvider, RESTAuthFunction
+from .token_loader import DLFToken, DLFTokenLoaderFactory
from .typedef import T
-class RESTCatalogOptions:
- URI = "uri"
- WAREHOUSE = "warehouse"
- TOKEN_PROVIDER = "token.provider"
- DLF_REGION = "dlf.region"
- DLF_ACCESS_KEY_ID = "dlf.access-key-id"
- DLF_ACCESS_KEY_SECRET = "dlf.access-key-secret"
- DLF_ACCESS_SECURITY_TOKEN = "dlf.security-token"
- PREFIX = "prefix"
-
-
class RESTException(Exception):
pass
@@ -133,7 +116,9 @@ class RESTApi:
self.logger = logging.getLogger(self.__class__.__name__)
self.client = HttpClient(options.get(RESTCatalogOptions.URI))
auth_provider = DLFAuthProvider(
- DLFToken(options), options.get(RESTCatalogOptions.DLF_REGION)
+ options.get(RESTCatalogOptions.DLF_REGION),
+ DLFToken.from_options(options),
+ DLFTokenLoaderFactory.create_token_loader(options)
)
base_headers = RESTUtil.extract_prefix_map(options, self.HEADER_PREFIX)
@@ -160,9 +145,9 @@ class RESTApi:
self.resource_paths = ResourcePaths.for_catalog_properties(options)
def __build_paged_query_params(
- max_results: Optional[int],
- page_token: Optional[str],
- name_patterns: Dict[str, str],
+ max_results: Optional[int],
+ page_token: Optional[str],
+ name_patterns: Dict[str, str],
) -> Dict[str, str]:
query_params = {}
if max_results is not None and max_results > 0:
@@ -178,7 +163,7 @@ class RESTApi:
return query_params
def __list_data_from_page_api(
- self, page_api: Callable[[Dict[str, str]], PagedResponse[T]]
+ self, page_api: Callable[[Dict[str, str]], PagedResponse[T]]
) -> List[T]:
results = []
query_params = {}
@@ -216,10 +201,10 @@ class RESTApi:
)
def list_databases_paged(
- self,
- max_results: Optional[int] = None,
- page_token: Optional[str] = None,
- database_name_pattern: Optional[str] = None,
+ self,
+ max_results: Optional[int] = None,
+ page_token: Optional[str] = None,
+ database_name_pattern: Optional[str] = None,
) -> PagedList[str]:
response = self.client.get_with_params(
@@ -255,10 +240,10 @@ class RESTApi:
self.rest_auth_function)
def alter_database(
- self,
- name: str,
- removals: Optional[List[str]] = None,
- updates: Optional[Dict[str, str]] = None,
+ self,
+ name: str,
+ removals: Optional[List[str]] = None,
+ updates: Optional[Dict[str, str]] = None,
):
if not name or not name.strip():
raise ValueError("Database name cannot be empty")
@@ -282,11 +267,11 @@ class RESTApi:
)
def list_tables_paged(
- self,
- database_name: str,
- max_results: Optional[int] = None,
- page_token: Optional[str] = None,
- table_name_pattern: Optional[str] = None,
+ self,
+ database_name: str,
+ max_results: Optional[int] = None,
+ page_token: Optional[str] = None,
+ table_name_pattern: Optional[str] = None,
) -> PagedList[str]:
response = self.client.get_with_params(
self.resource_paths.tables(database_name),
diff --git a/paimon-python/pypaimon/api/auth.py
b/paimon-python/pypaimon/api/auth.py
index 83e0e44c91..b30c1ec025 100644
--- a/paimon-python/pypaimon/api/auth.py
+++ b/paimon-python/pypaimon/api/auth.py
@@ -19,36 +19,14 @@ import base64
import hashlib
import hmac
import logging
-import threading
+import time
from abc import ABC, abstractmethod
from collections import OrderedDict
-from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional, Dict
-
-@dataclass
-class RESTAuthParameter:
- method: str
- path: str
- data: str
- parameters: Dict[str, str]
-
-
-@dataclass
-class DLFToken:
- access_key_id: str
- access_key_secret: str
- security_token: Optional[str] = None
-
- def __init__(self, options: Dict[str, str]):
- from . import RESTCatalogOptions
-
- self.access_key_id = options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID)
- self.access_key_secret = options.get(
- RESTCatalogOptions.DLF_ACCESS_KEY_SECRET)
- self.security_token = options.get(
- RESTCatalogOptions.DLF_ACCESS_SECURITY_TOKEN)
+from .token_loader import DLFTokenLoader, DLFToken
+from .typedef import RESTAuthParameter
class AuthProvider(ABC):
@@ -92,11 +70,33 @@ class DLFAuthProvider(AuthProvider):
AUTH_DATE_TIME_FORMAT = "%Y%m%dT%H%M%SZ"
MEDIA_TYPE = "application/json"
- def __init__(self, token: DLFToken, region: str):
+ TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000
+
+ token: DLFToken
+ token_loader: DLFTokenLoader
+ region: str
+
+ def __init__(self,
+ region: str,
+ token: DLFToken = None,
+ token_loader: DLFTokenLoader = None):
self.logger = logging.getLogger(self.__class__.__name__)
+ if token is None and token_loader is None:
+ raise ValueError("Either token or token_loader must be provided")
self.token = token
+ self.token_loader = token_loader
self.region = region
- self._lock = threading.Lock()
+
+ def get_token(self):
+ if self.token_loader is not None:
+ if self.token is None:
+ self.token = self.token_loader.load_token()
+ elif self.token is not None and self.token.expiration_at_millis is
not None:
+ if self.token.expiration_at_millis - int(time.time() * 1000) <
self.TOKEN_EXPIRATION_SAFE_TIME_MILLIS:
+ self.token = self.token_loader.load_token()
+ if self.token is None:
+ raise ValueError("Either token or token_loader must be provided")
+ return self.token
def merge_auth_header(
self, base_header: Dict[str, str], rest_auth_parameter:
RESTAuthParameter
@@ -109,12 +109,14 @@ class DLFAuthProvider(AuthProvider):
date = date_time[:8]
sign_headers = self.generate_sign_headers(
- rest_auth_parameter.data, date_time, self.token.security_token
+ rest_auth_parameter.data,
+ date_time,
+ self.get_token().security_token
)
authorization = DLFAuthSignature.get_authorization(
rest_auth_parameter=rest_auth_parameter,
- dlf_token=self.token,
+ dlf_token=self.get_token(),
region=self.region,
headers=sign_headers,
date_time=date_time,
diff --git a/paimon-python/pypaimon/api/client.py
b/paimon-python/pypaimon/api/client.py
index a7c69cb31d..d83f79a202 100644
--- a/paimon-python/pypaimon/api/client.py
+++ b/paimon-python/pypaimon/api/client.py
@@ -27,7 +27,7 @@ import requests
from requests.adapters import HTTPAdapter
from urllib3 import Retry
-from .auth import RESTAuthParameter
+from .typedef import RESTAuthParameter
from .api_response import ErrorResponse
from .rest_json import JSON
@@ -228,17 +228,20 @@ class DefaultErrorHandler(ErrorHandler):
raise RESTException("Unable to process: %s", message)
-class ExponentialRetryInterceptor:
+class ExponentialRetry:
+
+ adapter: HTTPAdapter
def __init__(self, max_retries: int = 5):
- self.max_retries = max_retries
- self.logger = logging.getLogger(self.__class__.__name__)
+ retry = self.__create_retry_strategy(max_retries)
+ self.adapter = HTTPAdapter(max_retries=retry)
- def create_retry_strategy(self) -> Retry:
+ @staticmethod
+ def __create_retry_strategy(max_retries: int) -> Retry:
retry_kwargs = {
- 'total': self.max_retries,
- 'read': self.max_retries,
- 'connect': self.max_retries,
+ 'total': max_retries,
+ 'read': max_retries,
+ 'connect': max_retries,
'backoff_factor': 1,
'status_forcelist': [429, 502, 503, 504],
'raise_on_status': False,
@@ -365,9 +368,8 @@ class HttpClient(RESTClient):
self.session = requests.Session()
- retry_interceptor = ExponentialRetryInterceptor(max_retries=1)
- retry_strategy = retry_interceptor.create_retry_strategy()
- adapter = HTTPAdapter(max_retries=retry_strategy)
+ retry_interceptor = ExponentialRetry(max_retries=3)
+ adapter = HTTPAdapter(max_retries=retry_interceptor.adapter)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
diff --git a/paimon-python/pypaimon/api/token_loader.py
b/paimon-python/pypaimon/api/token_loader.py
new file mode 100644
index 0000000000..46810a1121
--- /dev/null
+++ b/paimon-python/pypaimon/api/token_loader.py
@@ -0,0 +1,215 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from abc import ABC, abstractmethod
+from datetime import datetime, timezone
+from dataclasses import dataclass
+from typing import Optional, Dict
+from urllib.parse import urljoin
+import requests
+from requests.adapters import HTTPAdapter
+from requests.exceptions import RequestException
+
+from .rest_json import json_field, JSON
+from .typedef import RESTCatalogOptions
+from .client import ExponentialRetry
+
+
+@dataclass
+class DLFToken:
+ TOKEN_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
+
+ access_key_id: str = json_field('AccessKeyId')
+ access_key_secret: str = json_field('AccessKeySecret')
+ security_token: Optional[str] = json_field('SecurityToken')
+ expiration: Optional[str] = json_field('Expiration')
+ expiration_at_millis: Optional[int] = json_field('ExpirationAt',
default=None)
+
+ @staticmethod
+ def parse_expiration_to_millis(expiration: str) -> int:
+ date_time = datetime.strptime(expiration, DLFToken.TOKEN_DATE_FORMAT)
+ utc_datetime = date_time.replace(tzinfo=timezone.utc)
+ expiration_at_millis = int(utc_datetime.timestamp() * 1000)
+ return expiration_at_millis
+
+ def __init__(self, access_key_id: str, access_key_secret: str,
+ security_token: str, expiration: str = None,
expiration_at_millis: int = None):
+ self.access_key_id = access_key_id
+ self.access_key_secret = access_key_secret
+ self.security_token = security_token
+ self.expiration = expiration
+ if expiration_at_millis is not None:
+ self.expiration_at_millis = expiration_at_millis
+ if expiration is not None:
+ self.expiration_at_millis =
self.parse_expiration_to_millis(expiration)
+
+ @classmethod
+ def from_options(cls, options: Dict[str, str]) -> Optional['DLFToken']:
+ from .typedef import RESTCatalogOptions
+ if (options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID) is None
+ or options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET) is
None):
+ return None
+ else:
+ return cls(
+
access_key_id=options.get(RESTCatalogOptions.DLF_ACCESS_KEY_ID),
+
access_key_secret=options.get(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET),
+
security_token=options.get(RESTCatalogOptions.DLF_ACCESS_SECURITY_TOKEN)
+ )
+
+
+class DLFTokenLoader(ABC):
+
+ @abstractmethod
+ def load_token(self) -> DLFToken:
+ pass
+
+ @abstractmethod
+ def description(self) -> str:
+ pass
+
+
+class HTTPClient:
+ """HTTP client with retry and timeout configuration"""
+
+ def __init__(self, connect_timeout: int = 180, read_timeout: int = 180,
+ max_retries: int = 3):
+ self.connect_timeout = connect_timeout
+ self.read_timeout = read_timeout
+ self.session = requests.Session()
+
+ # Add retry adapter
+ retry_interceptor = ExponentialRetry(max_retries=3)
+ adapter = HTTPAdapter(max_retries=retry_interceptor.adapter)
+
+ self.session.mount("http://", adapter)
+ self.session.mount("https://", adapter)
+
+ # Set timeouts
+ self.session.timeout = (connect_timeout, read_timeout)
+
+ def get(self, url: str, **kwargs) -> requests.Response:
+ """Make GET request with configured timeouts and retries"""
+ return self.session.get(url, timeout=(self.connect_timeout,
self.read_timeout), **kwargs)
+
+ def close(self):
+ """Close the session"""
+ self.session.close()
+
+
+class DLFECSTokenLoader(DLFTokenLoader):
+ """
+ DLF ECS Token Loader implementation
+
+ This class loads DLF tokens from ECS metadata service.
+ """
+
+ # Class-level HTTP client (equivalent to static in Java)
+ _http_client: Optional[HTTPClient] = None
+
+ @classmethod
+ def get_http_client(cls) -> HTTPClient:
+ """Get or create the shared HTTP client"""
+ if cls._http_client is None:
+ cls._http_client = HTTPClient(
+ connect_timeout=180, # 3 minutes
+ read_timeout=180, # 3 minutes
+ max_retries=3
+ )
+ return cls._http_client
+
+ def __init__(self, ecs_metadata_url: str, role_name: Optional[str] = None):
+ """
+ Initialize DLF ECS Token Loader
+
+ Args:
+ ecs_metadata_url: ECS metadata service URL
+ role_name: Optional role name. If None, will be fetched from
metadata service
+ """
+ self.ecs_metadata_url = ecs_metadata_url
+ self.role_name = role_name
+
+ def load_token(self) -> DLFToken:
+ try:
+ if self.role_name is None:
+ self.role_name = self._get_role(self.ecs_metadata_url)
+
+ token_url = urljoin(self.ecs_metadata_url.rstrip('/') + '/',
self.role_name)
+ return self._get_token(token_url)
+
+ except Exception as e:
+ raise RuntimeError(f"Token loading failed: {e}") from e
+
+ def description(self) -> str:
+ return self.ecs_metadata_url
+
+ def _get_role(self, url: str) -> str:
+ try:
+ return self._get_response_body(url)
+ except Exception as e:
+ raise RuntimeError(f"Get role failed, error: {e}") from e
+
+ def _get_token(self, url: str) -> DLFToken:
+ try:
+ token_json = self._get_response_body(url)
+ return JSON.from_json(token_json, DLFToken)
+ except OSError as e:
+ # Python equivalent of UncheckedIOException
+ raise OSError(f"IO error while getting token: {e}") from e
+ except Exception as e:
+ raise RuntimeError(f"Get token failed, error: {e}") from e
+
+ def _get_response_body(self, url: str) -> str:
+ try:
+ http_client = self.get_http_client()
+ response = http_client.get(url)
+
+ if response is None:
+ raise RuntimeError("Get response failed, response is None")
+
+ if not response.ok:
+ raise RuntimeError(f"Get response failed, response:
{response.status_code} {response.reason}")
+
+ response_body = response.text
+ if response_body is None:
+ raise RuntimeError("Get response failed, response body is
None")
+ return response_body
+
+ except RuntimeError:
+ # Re-raise RuntimeError as-is
+ raise
+ except RequestException as e:
+ raise RuntimeError(f"Request failed: {e}") from e
+ except Exception as e:
+ raise RuntimeError(f"Get response failed, error: {e}") from e
+
+
+# Factory and utility functions
+class DLFTokenLoaderFactory:
+ """Factory for creating DLF token loaders"""
+
+ @staticmethod
+ def create_token_loader(options: Dict[str, str]) ->
Optional['DLFTokenLoader']:
+ """Create ECS token loader"""
+ loader = options.get(RESTCatalogOptions.DLF_TOKEN_LOADER)
+ if loader == 'ecs':
+ ecs_metadata_url = options.get(
+ RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL,
+
'http://100.100.100.200/latest/meta-data/Ram/security-credentials/'
+ )
+ role_name = options.get(RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME)
+ return DLFECSTokenLoader(ecs_metadata_url, role_name)
+ return None
diff --git a/paimon-python/pypaimon/api/typedef.py
b/paimon-python/pypaimon/api/typedef.py
index 9c79e48323..4cf65738f9 100644
--- a/paimon-python/pypaimon/api/typedef.py
+++ b/paimon-python/pypaimon/api/typedef.py
@@ -16,7 +16,7 @@
# under the License.
from dataclasses import dataclass
-from typing import Optional, TypeVar
+from typing import Optional, TypeVar, Dict
T = TypeVar("T")
@@ -61,4 +61,26 @@ class Identifier:
return self.branch_name
def is_system_table(self) -> bool:
- return self.object_name.startswith("$")
+ return self.object_name.startswith('$')
+
+
+@dataclass
+class RESTAuthParameter:
+ method: str
+ path: str
+ data: str
+ parameters: Dict[str, str]
+
+
+class RESTCatalogOptions:
+ URI = "uri"
+ WAREHOUSE = "warehouse"
+ TOKEN_PROVIDER = "token.provider"
+ DLF_REGION = "dlf.region"
+ DLF_ACCESS_KEY_ID = "dlf.access-key-id"
+ DLF_ACCESS_KEY_SECRET = "dlf.access-key-secret"
+ DLF_ACCESS_SECURITY_TOKEN = "dlf.security-token"
+ DLF_TOKEN_LOADER = "dlf.token-loader"
+ DLF_TOKEN_ECS_ROLE_NAME = "dlf.token-ecs-role-name"
+ DLF_TOKEN_ECS_METADATA_URL = "dlf.token-ecs-metadata-url"
+ PREFIX = 'prefix'
diff --git a/paimon-python/pypaimon/tests/api_test.py
b/paimon-python/pypaimon/tests/api_test.py
index 0150c4919e..d8b872c33f 100644
--- a/paimon-python/pypaimon/tests/api_test.py
+++ b/paimon-python/pypaimon/tests/api_test.py
@@ -26,10 +26,12 @@ from urllib.parse import urlparse
import unittest
import pypaimon.api as api
-from ..api.api_response import (ConfigResponse, ListDatabasesResponse,
GetDatabaseResponse, TableMetadata, Schema,
- GetTableResponse, ListTablesResponse,
TableSchema, RESTResponse, PagedList, DataField)
+from ..api.api_response import (ConfigResponse, ListDatabasesResponse,
GetDatabaseResponse,
+ TableMetadata, Schema, GetTableResponse,
ListTablesResponse,
+ TableSchema, RESTResponse, PagedList,
DataField)
from ..api import RESTApi
from ..api.rest_json import JSON
+from ..api.token_loader import DLFTokenLoaderFactory, DLFToken
from ..api.typedef import Identifier
from ..api.data_types import AtomicInteger, DataTypeParser, AtomicType,
ArrayType, MapType, RowType
@@ -240,7 +242,8 @@ OBJECT_TABLE = "OBJECT_TABLE"
class RESTCatalogServer:
"""Mock REST server for testing"""
- def __init__(self, data_path: str, auth_provider, config: ConfigResponse,
warehouse: str):
+ def __init__(self, data_path: str, auth_provider, config: ConfigResponse,
warehouse: str,
+ role_name: str = None, token_json: str = None):
self.logger = logging.getLogger(__name__)
self.warehouse = warehouse
self.config_response = config
@@ -259,6 +262,8 @@ class RESTCatalogServer:
# Initialize mock catalog (simplified)
self.data_path = data_path
self.auth_provider = auth_provider
+ self.role_name = role_name
+ self.token_json = token_json
# HTTP server setup
self.server = None
@@ -375,6 +380,13 @@ class RESTCatalogServer:
if warehouse_param == self.warehouse:
return self._mock_response(self.config_response, 200)
+ # ecs role
+ if resource_path == '/ram/security-credential/':
+ return self._mock_response(self.role_name, 200)
+
+ if resource_path == f'/ram/security-credential/{self.role_name}':
+ return self._mock_response(self.token_json, 200)
+
# Databases endpoint
if resource_path == self.database_uri or
resource_path.startswith(self.database_uri + "?"):
return self._databases_api_handler(method, data, parameters)
@@ -462,7 +474,6 @@ class RESTCatalogServer:
if len(path_parts) == 3:
# Basic table operations
return self._table_handle(method, data, identifier)
-
return self._mock_response(ErrorResponse(None, None, "Not Found",
404), 404)
def _databases_api_handler(self, method: str, data: str,
@@ -861,7 +872,8 @@ class ApiTestCase(unittest.TestCase):
DataField(0, "name", AtomicType('INT'), 'desc name'),
DataField(1, "arr11", ArrayType(True, AtomicType('INT')),
'desc arr11'),
DataField(2, "map11", MapType(False, AtomicType('INT'),
- MapType(False,
AtomicType('INT'), AtomicType('INT'))), 'desc arr11'),
+ MapType(False,
AtomicType('INT'), AtomicType('INT'))),
+ 'desc arr11'),
]
schema = TableSchema(len(data_fields), data_fields,
len(data_fields), [], [], {}, "")
test_tables = {
@@ -888,6 +900,50 @@ class ApiTestCase(unittest.TestCase):
server.shutdown()
print("Server stopped")
-
-if __name__ == "__main__":
- unittest.main()
+ def test_ecs_loader_token(self):
+ token = DLFToken(
+ access_key_id='AccessKeyId',
+ access_key_secret='AccessKeySecret',
+ security_token='AQoDYXdzEJr...<remainder of security token>',
+ expiration="2023-12-01T12:00:00Z"
+ )
+ token_json = JSON.to_json(token)
+ role_name = 'test_role'
+ config = ConfigResponse(defaults={"prefix": "mock-test"})
+ server = RESTCatalogServer(
+ data_path="/tmp/test_warehouse",
+ auth_provider=None,
+ config=config,
+ warehouse="test_warehouse",
+ role_name=role_name,
+ token_json=token_json
+ )
+ try:
+ # Start server
+ server.start()
+ ecs_metadata_url =
f"http://localhost:{server.port}/ram/security-credential/"
+ options = {
+ api.RESTCatalogOptions.DLF_TOKEN_LOADER: 'ecs',
+ api.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL:
ecs_metadata_url
+ }
+ loader = DLFTokenLoaderFactory.create_token_loader(options)
+ load_token = loader.load_token()
+ self.assertEqual(load_token.access_key_id, token.access_key_id)
+ self.assertEqual(load_token.access_key_secret,
token.access_key_secret)
+ self.assertEqual(load_token.security_token, token.security_token)
+ self.assertEqual(load_token.expiration, token.expiration)
+ options_with_role = {
+ api.RESTCatalogOptions.DLF_TOKEN_LOADER: 'ecs',
+ api.RESTCatalogOptions.DLF_TOKEN_ECS_METADATA_URL:
ecs_metadata_url,
+ api.RESTCatalogOptions.DLF_TOKEN_ECS_ROLE_NAME: role_name,
+ }
+ loader =
DLFTokenLoaderFactory.create_token_loader(options_with_role)
+ token = loader.load_token()
+ self.assertEqual(load_token.access_key_id, token.access_key_id)
+ self.assertEqual(load_token.access_key_secret,
token.access_key_secret)
+ self.assertEqual(load_token.security_token, token.security_token)
+ self.assertEqual(load_token.expiration, token.expiration)
+ finally:
+ # Shutdown server
+ server.shutdown()
+ print("Server stopped")