grinyael86 opened a new issue, #3291:
URL: https://github.com/apache/iceberg-python/issues/3291
### Question
I’m using pycberg v0.11.1 to write DataFrames in a setup that includes
Lakekeeper, Iceberg, and MinIO.
When writing a single file, everything works correctly and performance is
very good. However, issues start to appear when running tests with pytest-xdist
using multiple workers.
With a small number of tests, everything still passes. But when I run around
15 tests in parallel, most of them complete successfully and write to storage
as expected. The problem occurs toward the end of the run—some of the remaining
tests fail during the write process.
> pyiceberg. exceptions.SignError: Failed to sign request 412: {'method':
'PUT', 'region': 'us-east-1', 'uri': '<minIO url>
/my_warehouse/019dca4b-f5bc-71b3-9ccb-a54d0c1a7f87/019dca4c-45b7-7d03-b1b9-f9c899b8db5c/metadata/62127deb-4d73-46a7-bec5-3b9bfc6c6566-m0.avro',
'headers': {'User-Agent': ['aiobotocore/3.3.0 md/Botocore#1.42.70 ua/2.1
os/linux#4.18.0-348.7.1.el8_5.x86_64 md/arch#x86_64 lang/python#3.14.3
md/pyimpl#CPython m/N,D,c,a cfg/retry-mode#legacy botocore/1.42.70'], 'Expect':
['100-continue']}}
`class IcebergConnProperties(CatalogConnProperties):
"""Shared base properties for any Iceberg REST catalog backend.
Attributes:
conn_type: Catalog type forwarded to PyIceberg (e.g. ``"rest"``).
catalog_uri: REST catalog endpoint URL.
token: Bearer token for the REST catalog (populated at runtime via
``get_access_token``; pass ``None`` at construction time).
warehouse: Iceberg warehouse identifier registered in the catalog.
ssl_verify: ``"true"`` / ``"false"`` — whether to verify TLS certs.
extra: Arbitrary key/value pairs forwarded verbatim to
``pyiceberg.catalog.load_catalog``.
"""
conn_type: str
catalog_uri: str
token: str | None
warehouse: str
ssl_verify: str = "true"
extra: dict[str, str] = field(default_factory=dict)
def to_catalog_config(self) -> dict[str, str | None]:
"""Return kwargs suitable for ``pyiceberg.catalog.load_catalog``."""
kwargs = {
"type": self.conn_type,
"uri": self.catalog_uri,
"token": self.token,
"warehouse": self.warehouse,
"ssl.verify": self.ssl_verify,
"header.X-Iceberg-Access-Delegation": "vended-credentials",
}
kwargs.update(self.extra)
return kwargs`
` def upload_dataframe_to_storage(
self,
dataframe: pl.DataFrame,
schema: str,
table_name: str,
) -> None:
"""Write a Polars DataFrame to an Iceberg table.
If the target table does not exist it is created from the Arrow
schema
of ``dataframe``. If it already exists the data is appended.
Args:
dataframe: Data to persist.
schema: Iceberg namespace / Trino schema that will own the table.
table_name: Unqualified table name within ``schema``.
"""
table_just_created = False
table_id = f"{schema}.{table_name}"
catalog = self._connect_catalog(self._conn_properties)
arrow_table = dataframe.to_arrow()
try:
if catalog.table_exists(table_id):
catalog.drop_table(table_id)
self.logger.debug(f"Dropped existing table '{table_id}'
before recreating")
else:
self.logger.debug(f"Table '{table_id}' not found — creating
from Arrow schema")
self.logger.debug(f"Created namespace if not exists '{schema}'")
iceberg_table = catalog.create_table(
identifier=table_id,
schema=arrow_table.schema,
)
self.logger.debug(f"Created table '{table_id}'")
table_just_created = True
iceberg_table.append(arrow_table)
self.logger.debug(f"Wrote {len(dataframe)} rows to '{table_id}'")
self.logger.debug(f"Table '{table_id}' appended")
except Exception:
self.logger.exception(f"Failed to append {len(dataframe)} rows
to '{table_id}'")
if table_just_created:
self.logger.debug(f"Dropping empty table '{table_id}'
created in this call")
catalog.drop_table(table_id)
raise
finally:
self.logger.debug("Deleted arrow table from memory")
del arrow_table
def _connect_catalog(self, props: CatalogConnProperties) -> Catalog:
"""Connect to the Iceberg REST catalog."""
self.logger.debug("Loading Iceberg REST catalog")
catalog_kwargs = props.to_catalog_config()
if Config.Trino.USE_JWT_AUTH:
catalog_kwargs["token"] = get_access_token()
self.logger.debug(f"Catalog kwargs: {catalog_kwargs}")
return load_catalog(name=self._CATALOG_NAME, **catalog_kwargs)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]