This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f259ea36f7 [python] optimize file io retry for OSS and S3 (#6919)
f259ea36f7 is described below
commit f259ea36f7c49b6b83828628e43c6ba184b80cc7
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue Dec 30 10:20:16 2025 +0800
[python] optimize file io retry for OSS and S3 (#6919)
---
paimon-python/pypaimon/common/file_io.py | 31 +++++++++++++++++++++++++++++++
1 file changed, 31 insertions(+)
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 1ba9caa791..497d711a49 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -63,6 +63,31 @@ class FileIO:
else:
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
+ @staticmethod
+ def _create_s3_retry_config(
+ max_attempts: int = 10,
+ request_timeout: int = 60,
+ connect_timeout: int = 60
+ ) -> Dict[str, Any]:
+ """
+ AwsStandardS3RetryStrategy and timeout parameters are only available
+ in PyArrow >= 8.0.0.
+ """
+ if parse(pyarrow.__version__) >= parse("8.0.0"):
+ config = {
+ 'request_timeout': request_timeout,
+ 'connect_timeout': connect_timeout
+ }
+ try:
+ from pyarrow.fs import AwsStandardS3RetryStrategy
+ retry_strategy =
AwsStandardS3RetryStrategy(max_attempts=max_attempts)
+ config['retry_strategy'] = retry_strategy
+ except ImportError:
+ pass
+ return config
+ else:
+ return {}
+
def _extract_oss_bucket(self, location) -> str:
uri = urlparse(location)
if uri.scheme and uri.scheme != "oss":
@@ -104,6 +129,9 @@ class FileIO:
client_kwargs['endpoint_override'] = (oss_bucket + "." +
self.properties.get(OssOptions.OSS_ENDPOINT))
+ retry_config = self._create_s3_retry_config()
+ client_kwargs.update(retry_config)
+
return S3FileSystem(**client_kwargs)
def _initialize_s3_fs(self) -> FileSystem:
@@ -118,6 +146,9 @@ class FileIO:
"force_virtual_addressing": True,
}
+ retry_config = self._create_s3_retry_config()
+ client_kwargs.update(retry_config)
+
return S3FileSystem(**client_kwargs)
def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) ->
FileSystem: