arul-cc opened a new issue, #2120:
URL: https://github.com/apache/iceberg-python/issues/2120
### Apache Iceberg version
0.9.1 (latest release)
### Please describe the bug 🐞
## Problem Description
While performing parallel upsert operations on an Iceberg table, the table
metadata appears to have been corrupted, causing the table to become
inaccessible (though the physical data files still exist). The issue occurs
when multiple concurrent upsert operations are attempted.
## Expected Behavior
- Parallel upsert operations should either:
- Succeed with proper ACID transaction handling, or
- Fail gracefully without corrupting table metadata
## Actual Behavior
1. First upsert operation succeeds
2. Subsequent operations fail with `CommitFailedException` (expected due to
ACID constraints)
3. With more parallel calls, the table becomes inaccessible with "table not
found" errors
4. Physical data files remain intact, suggesting metadata corruption
## Reproduction Steps
1. Initialize Iceberg table with sample data using the provided
`upload_file` code
2. Attempt parallel upsert operations using the `perform_upsert` function
with multiple workers
3. Observe the table becomes inaccessible after several attempts
## Code Samples
### Table Initialization Code
```python
import os
import uuid
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc
from minio import Minio
from minio.error import S3Error
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
StringType, IntegerType, LongType, FloatType, DoubleType, BooleanType,
DateType, TimestampType, TimeType, BinaryType, DecimalType, FixedType,
ListType, MapType, StructType, NestedField
)
from pyiceberg.expressions import EqualTo
from pyiceberg.table import Table
from io import BytesIO
from typing import List, Dict, Union, Optional
from datetime import datetime
import logging
# logging.basicConfig(level=logging.DEBUG)
class IcebergMinIOUtils:
def __init__(self):
"""Initialize MinIO and Iceberg clients."""
# MinIO client
self.minio_client = Minio(
endpoint="minio:9000",
access_key="admin",
secret_key="password",
secure=False
)
self.bucket_name = "warehouse"
# Iceberg catalog
self.catalog = load_catalog(
"default",
**{
"uri": "http://localhost:8181",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
"s3.path-style-access": "true",
}
)
# Size cap from environment variable (in MB)
self.max_download_size_mb = float(os.getenv("MAX_DOWNLOAD_SIZE_MB",
100))
# Metadata columns definition
self.metadata_columns = [
NestedField(field_id=1000, name="created_at",
field_type=TimestampType(), required=False),
NestedField(field_id=1001, name="source",
field_type=StringType(), required=False)
]
def _is_iceberg_format(self, data: Union[str, pd.DataFrame, pa.Table,
bytes]) -> bool:
"""Check if data is in a format suitable for Iceberg (JSON, Parquet,
CSV, DataFrame)."""
if isinstance(data, pd.DataFrame) or isinstance(data, pa.Table):
return True
if isinstance(data, str):
ext = os.path.splitext(data)[1].lower()
return ext in [".json", ".parquet", ".csv"]
return False
def _infer_schema_from_data(self, data: Union[pd.DataFrame, pa.Table])
-> List[NestedField]:
"""Infer Iceberg schema fields from Pandas DataFrame or PyArrow
Table, including JSON/dict types."""
if isinstance(data, pd.DataFrame):
# Convert object columns containing dicts or lists to PyArrow
StructType or ListType
for col in data.columns:
if data[col].dtype == "object":
sample = next((x for x in data[col].dropna() if x is not
None), None)
if isinstance(sample, dict):
# Infer struct fields from dict
struct_fields = [
pa.field(k, self._infer_pyarrow_type(v)) for k,
v in sample.items()
]
data[col] = data[col].apply(lambda x: x if
isinstance(x, dict) else None)
data[col] = pa.array(data[col],
pa.struct(struct_fields))
elif isinstance(sample, list):
# Infer list element type from first non-empty list
element_sample = next((x for l in data[col].dropna()
for x in l if l), None)
element_type =
self._infer_pyarrow_type(element_sample) if element_sample else pa.string()
data[col] = data[col].apply(lambda x: x if
isinstance(x, list) else None)
data[col] = pa.array(data[col],
pa.list_(element_type))
data = pa.Table.from_pandas(data, preserve_index=False)
fields = []
for i, field in enumerate(data.schema):
field_type = self._map_pyarrow_to_iceberg(field.type)
required = False
if field.name=="id":
required = True
fields.append(NestedField(field_id=i+1, name=field.name,
field_type=field_type, required=required))
return fields
def _convert_data_into_iceberg_supported_format(self, data:
Union[pd.DataFrame, pa.Table]):
if isinstance(data, pa.Table):
data = data.to_pandas()
for col_name in data.columns:
col_type = data[col_name].dtype
if pd.api.types.is_datetime64_ns_dtype(col_type):
print(f"Converting column '{col_name}' from nanosecond
timestamp to microsecond timestamp.")
data[col_name] = data[col_name].astype('datetime64[us]')
return pa.Table.from_pandas(data)
def _infer_pyarrow_type(self, value):
"""Infer PyArrow type from a Python value."""
if isinstance(value, str):
return pa.string()
elif isinstance(value, int):
return pa.int64()
elif isinstance(value, float):
return pa.float64()
elif isinstance(value, bool):
return pa.bool_()
elif isinstance(value, datetime):
return pa.timestamp("ns")
elif isinstance(value, date):
return pa.date32()
elif isinstance(value, dict):
struct_fields = [pa.field(k, self._infer_pyarrow_type(v)) for k,
v in value.items()]
return pa.struct(struct_fields)
elif isinstance(value, list):
element_sample = next((x for x in value if x is not None), None)
element_type = self._infer_pyarrow_type(element_sample) if
element_sample else pa.string()
return pa.list_(element_type)
else:
return pa.string() # Fallback for unsupported types
def _map_pyarrow_to_iceberg(self, pa_type: pa.DataType) -> "IcebergType":
"""Map PyArrow types to Iceberg types, supporting all types from
PyIceberg documentation."""
if pa.types.is_string(pa_type) or pa.types.is_large_string(pa_type):
return StringType()
elif pa.types.is_int8(pa_type) or pa.types.is_int16(pa_type) or
pa.types.is_int32(pa_type):
return IntegerType()
elif pa.types.is_int64(pa_type):
return LongType()
elif pa.types.is_float16(pa_type) or pa.types.is_float32(pa_type):
return FloatType()
elif pa.types.is_float64(pa_type):
return DoubleType()
elif pa.types.is_boolean(pa_type):
return BooleanType()
elif pa.types.is_date32(pa_type) or pa.types.is_date64(pa_type):
return DateType()
elif pa.types.is_timestamp(pa_type):
return TimestampType()
elif pa.types.is_time32(pa_type) or pa.types.is_time64(pa_type):
return TimeType()
elif pa.types.is_binary(pa_type) or
pa.types.is_large_binary(pa_type):
return BinaryType()
elif pa.types.is_fixed_size_binary(pa_type):
return FixedType(length=pa_type.byte_width)
elif pa.types.is_decimal(pa_type):
return DecimalType(precision=pa_type.precision,
scale=pa_type.scale)
elif pa.types.is_list(pa_type):
element_type = self._map_pyarrow_to_iceberg(pa_type.value_type)
return ListType(element_id=0, element_type=element_type,
element_required=False)
elif pa.types.is_struct(pa_type):
struct_fields = [
NestedField(
field_id=i+1,
name=field.name,
field_type=self._map_pyarrow_to_iceberg(field.type),
required=False
)
for i, field in enumerate(pa_type)
]
return StructType(fields=struct_fields)
elif pa.types.is_map(pa_type):
key_type = self._map_pyarrow_to_iceberg(pa_type.key_type)
value_type = self._map_pyarrow_to_iceberg(pa_type.item_type)
return MapType(key_id=0, key_type=key_type, value_id=1,
value_type=value_type, value_required=False)
else:
# raise ValueError(f"Unsupported PyArrow type: {pa_type}")
return StringType()
#
def _create_table_if_not_exists(
self,
table_name: str,
namespace: str = "default",
data: Optional[Union[pd.DataFrame, pa.Table]] = None,
autogenerate_id: bool = False,
generate_meta_columns: bool = False,
index_columns: Optional[List[str]] = None
) -> Table:
"""Create Iceberg table if it doesn't exist with optional id and
metadata columns."""
identifier = f"{namespace}.{table_name}"
try:
return self.catalog.load_table(identifier)
except Exception:
# Infer schema from data if provided
schema_fields = []
data = self._convert_data_into_iceberg_supported_format(data)
if data is not None:
schema_fields.extend(self._infer_schema_from_data(data))
field_id_max = len(schema_fields)
meta_columns = []
if generate_meta_columns:
meta_column_schemas = [
NestedField(field_id=field_id_max+1,
name="recordguid__", field_type=StringType(), required=False),
NestedField(field_id=field_id_max+2,
name="recordstatus__", field_type=StringType(), required=False),
NestedField(field_id=field_id_max+3,
name="created_by__", field_type=StringType(), required=False),
NestedField(field_id=field_id_max+4, name="owner__",
field_type=StringType(), required=False),
NestedField(field_id=field_id_max+5,
name="last_updated_by__", field_type=StringType(), required=False),
NestedField(field_id=field_id_max+6,
name="created_at__", field_type=TimestampType(), required=False),
NestedField(field_id=field_id_max+7,
name="last_updated_at__", field_type=TimestampType(), required=False),
NestedField(field_id=field_id_max+8, name="tags__",
field_type=StringType(), required=False),
NestedField(field_id=field_id_max+9,
name="user_actions__", field_type=StringType(), required=False),
NestedField(field_id=field_id_max+10,
name="proposals__", field_type=StringType(), required=False),
NestedField(field_id=field_id_max+11,
name="link_data__", field_type=StringType(), required=False),
NestedField(field_id=field_id_max+12,
name="related_data__", field_type=StringType(), required=False),
NestedField(field_id=field_id_max+13,
name="signal__", field_type=StringType(), required=False),
NestedField(field_id=field_id_max+14,
name="exceptions__", field_type=StringType(), required=False),
NestedField(field_id=field_id_max+15,
name="remediation__", field_type=StringType(), required=False),
]
schema_fields.extend(meta_column_schemas)
data_columns = set(data.schema.names)
print('data_columns :',data_columns)
num_rows = data.num_rows
new_columns = []
new_column_names = []
for meta_column_schema in meta_column_schemas:
if meta_column_schema.name not in data_columns:
default_array = pa.array([""] * num_rows,
type=pa.string())
if meta_column_schema.name in
['created_at__','last_updated_at__']:
epoch = datetime(1970, 1, 1)
default_array = pa.array([epoch] * num_rows,
type=pa.timestamp('ms'))
data =
data.append_column(meta_column_schema.name, default_array)
old_schema = data.schema
# Replace 'name' field to be non-nullable
new_fields = []
for field in old_schema:
if field.name == 'id':
new_fields.append(pa.field(field.name,
field.type, nullable=False))
else:
new_fields.append(field)
new_schema = pa.schema(new_fields)
# Cast table to new schema (this enforces the schema but
does not change data)
data = data.cast(new_schema)
id_column_field_id=1
for schema_field in schema_fields:
if schema_field.name=='id':
id_column_field_id=schema_field.field_id
for schema_field in schema_fields:
if schema_field.name=='id':
# schema_field.required = True
id_column_field_id=schema_field.field_id
fields_schema = Schema(
*schema_fields,
identifier_field_ids=[id_column_field_id]
)
table_properties = {
"write.target-file-size-bytes": 67108864,
}
table = self.catalog.create_table(
identifier=identifier,
schema=fields_schema,
properties=table_properties,
)
return self.catalog.load_table(table.name()), data
def _update_table_schema(self, table:Table,
schema_details:list[NestedField]):
existing_columns = {f.name for f in table.schema().fields}
missing_meta_columns = [col for col in schema_details if col.name
not in existing_columns]
if not missing_meta_columns:
return table
# Start a transaction
with table.transaction() as transaction:
# Update schema to add missing metadata columns
update_schema =
transaction.update_schema(allow_incompatible_changes=True)
for col in missing_meta_columns:
update_schema = update_schema.add_column(path=col.name,
field_type=col.field_type, required=col.required)
update_schema.commit()
print('successfully schema updated')
return self.catalog.load_table(table.name())
def _update_schema_if_needed(self, table: Table, generate_meta_columns:
bool) -> Table:
"""Update table schema transactionally to add missing metadata
columns."""
if not generate_meta_columns:
return table
existing_columns = {f.name for f in table.schema().fields}
missing_meta_columns = [col for col in self.metadata_columns if
col.name not in existing_columns]
if not missing_meta_columns:
return table
# Start a transaction
with table.transaction() as transaction:
# Update schema to add missing metadata columns
for col in missing_meta_columns:
transaction.update_schema().add_column(path=col.name,
field_type=col.field_type, required=col.required)
transaction.commit_transaction()
return self.catalog.load_table(table.name())
def _convert_data_into_iceberg_supported_format(self, data:
Union[pd.DataFrame, pa.Table]):
if isinstance(data, pa.Table):
data = data.to_pandas()
for col_name in data.columns:
col_type = data[col_name].dtype
if pd.api.types.is_datetime64_ns_dtype(col_type):
print(f"Converting column '{col_name}' from nanosecond
timestamp to microsecond timestamp.")
data[col_name] = data[col_name].astype('datetime64[us]')
return pa.Table.from_pandas(data)
def upload_file(
self,
data: Union[str, pd.DataFrame, pa.Table, bytes]=None,
key: str = None,
table_name: str = None,
namespace: str = "default",
autogenerate_id: bool = False,
generate_meta_columns: bool = False,
index_columns: Optional[List[str]] = None
) -> str:
"""Upload file to MinIO or Iceberg transactionally, returning the
file URL."""
logging.debug(f"Uploading to table {namespace}.{table_name} with key
{key}")
if (self._is_iceberg_format(data) or self._is_iceberg_format(key))
and table_name:
# Iceberg upload
if isinstance(key, str):
ext = os.path.splitext(key)[1].lower()
if ext == ".csv":
data = pd.read_csv(key)
elif ext == ".json":
data = pd.read_json(key)
elif ext == ".parquet":
data = pq.read_table(key)
else:
raise ValueError("Unsupported file format for Iceberg")
# Convert to PyArrow Table
if isinstance(data, pd.DataFrame):
data = pa.Table.from_pandas(data, preserve_index=False)
# Add metadata columns if requested
if generate_meta_columns:
data = data.append_column("created_at",
pa.array([datetime.now()] * data.num_rows, pa.timestamp("ns")))
data = data.append_column("source", pa.array(["upload"] *
data.num_rows, pa.string()))
# Add autogenerated id if requested
if autogenerate_id:
data = data.append_column("id", pa.array([str(uuid.uuid4())
for _ in range(data.num_rows)], pa.string()))
data = self._convert_data_into_iceberg_supported_format(data)
# Create or load table
table, data = self._create_table_if_not_exists(
table_name, namespace, data, autogenerate_id,
generate_meta_columns, index_columns
)
# Update schema if needed
table = self._update_schema_if_needed(table,
generate_meta_columns)
# cols_to_null = ['B', 'C']
# df.loc[:, cols_to_null] = None
# Perform transactional upload
with table.transaction() as transaction:
logging.debug("Starting transaction")
if autogenerate_id or "id" in [f.name for f in
table.schema().fields]:
existing_data = table.scan().to_arrow()
if "id" in existing_data.column_names:
existing_ids = set(existing_data["id"].to_pylist())
if existing_ids:
new_data =
data.filter(pc.invert(pc.is_in(data["id"], pa.array(existing_ids))))
update_data = data.filter(pc.is_in(data["id"],
pa.array(existing_ids)))
# Append new records
if new_data.num_rows > 0:
transaction.append(new_data)
# Update existing records
if update_data.num_rows > 0:
transaction.overwrite(update_data)
else:
transaction.append(data)
else:
transaction.append(data)
else:
transaction.append(data)
logging.debug("Transaction completed")
# Verify metadata exists
try:
table = self.catalog.load_table(f"{namespace}.{table_name}")
if not table.metadata.snapshots:
raise ValueError(f"No snapshots found for table
{namespace}.{table_name}")
except Exception as e:
logging.error(f"Metadata verification failed: {e}")
raise
# Set index properties : indexing not supported till now
# if index_columns:
# for col in index_columns:
# table.update_properties({f"index_{col}": "true"})
logging.debug(f"Upload successful:
s3://{self.bucket_name}/{namespace}/{table_name}")
return f"s3://{self.bucket_name}/{namespace}/{table_name}"
else:
# Direct MinIO upload
if isinstance(data, str):
with open(data, "rb") as f:
data = f.read()
elif isinstance(data, pa.Table):
buffer = BytesIO()
pq.write_table(data, buffer)
data = buffer.getvalue()
elif isinstance(data, pd.DataFrame):
data = pa.Table.from_pandas(data)
buffer = BytesIO()
pq.write_table(data, buffer)
data = buffer.getvalue()
self.minio_client.put_object(
bucket_name=self.bucket_name,
object_name=key,
data=BytesIO(data),
length=len(data)
)
return f"s3://{self.bucket_name}/{key}"
ice_berg_minio_utils = IcebergMinIOUtils()
ice_berg_minio_utils.upload_file(key='file1.parquet',
table_name='arul.sample',autogenerate_id=True, generate_meta_columns=True,
index_columns=['id'])
```
### Parallel Upsert Code
```python
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import EqualTo
from pyiceberg.table import Table
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
from pyiceberg.exceptions import CommitFailedException
import pyarrow as pa
# Helper: Convert Iceberg type to PyArrow type
def iceberg_type_to_pyarrow_type(iceberg_type):
from pyiceberg.types import (
IntegerType, LongType, FloatType, DoubleType,
StringType, BooleanType, TimestampType, DateType,
)
if isinstance(iceberg_type, IntegerType):
return pa.int32()
elif isinstance(iceberg_type, LongType):
return pa.int64()
elif isinstance(iceberg_type, FloatType):
return pa.float32()
elif isinstance(iceberg_type, DoubleType):
return pa.float64()
elif isinstance(iceberg_type, StringType):
return pa.string()
elif isinstance(iceberg_type, BooleanType):
return pa.bool_()
elif isinstance(iceberg_type, TimestampType):
return pa.timestamp("us")
elif isinstance(iceberg_type, DateType):
return pa.date32()
else:
raise NotImplementedError(f"Unsupported type: {iceberg_type}")
# Initialize the catalog
catalog = load_catalog(
"default",
**{
"uri": "http://localhost:8181",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
"s3.path-style-access": "true",
}
)
# Load your table
table: Table = catalog.load_table("default.arul.sample")
iceberg_schema = table.schema()
# Build PyArrow schema
arrow_fields = [
pa.field(field.name, iceberg_type_to_pyarrow_type(field.field_type),
nullable=not field.required)
for field in iceberg_schema.fields
]
arrow_schema = pa.schema(arrow_fields)
def perform_upsert(partition_data):
"""Function to perform upsert on a partition of data"""
try:
df = pa.Table.from_pandas(pd.DataFrame(partition_data),
schema=arrow_schema)
upstr = table.upsert(df)
print(f"Successfully upserted partition with {len(df)} records")
print("upstr.rows_inserted :",upstr.rows_inserted)
print("upstr.rows_updated :",upstr.rows_updated)
# except (CommitFailedException):
print(f"==== Retrying the update for records ==== ")
# perform_upsert(partition_data)
except Exception as e:
print(f"Error in upsert: {str(e)}")
all_data = [
[
{
"guid": "b1004c82-6b45-48dd-b657-22044b6717d3",
"file1_int_a": 0,
"file1_float_b": 0.4099746151,
"file1_string_c": "string1_0",
"file1_int_1": 700708,
"file1_float_1": -0.00013162700000000002,
"file1_string_1": "unique_str1_feae5ebe-20af-446e-b7cd-0d239cee4f0a",
"file1_bool_1": True,
"file1_date_1": 1775537823,
"file1_cat_1": "P",
"file1_int_2": 377626,
"file1_float_2": 2.8005186429,
"file1_string_2": "changed_by_parallel_calls",
"file1_bool_2": True,
"file1_date_2": 1795124582,
"file1_cat_2": "V",
"file1_int_3": 650704,
"file1_float_3": 0.0011361511,
"file1_string_3":
"final_unique_str1_7e8f1a71-faae-4071-a6d6-fa45d17f5645",
"file1_bool_3": True,
"created_at": "2025-06-11 16:20:58",
"source": "upload",
"id": "ec99ffef-d990-4f01-beea-3f4ac19bd502",
"recordguid__": "",
"recordstatus__": "",
"created_by__": "",
"owner__": "",
"last_updated_by__": "",
"created_at__": 0,
"last_updated_at__": 0,
"tags__": "",
"user_actions__": "",
"proposals__": "",
"link_data__": "",
"related_data__": "",
"signal__": "",
"exceptions__": "",
"remediation__": ""
}
],
[
{
"guid": "7d59bd17-b754-4474-a0d6-5c662edf49bd",
"file1_int_a": 165230,
"file1_float_b": 0.1493543918,
"file1_string_c": "string1_165230",
"file1_int_1": 649565,
"file1_float_1": 0.1653516706,
"file1_string_1": "unique_str1_e46814a7-1ce0-4284-ae17-3596f8462639",
"file1_bool_1": True,
"file1_date_1": 1852705711,
"file1_cat_1": "S",
"file1_int_2": 169692,
"file1_float_2": 0.8480360042,
"file1_string_2": "changed_by_parallel_calls",
"file1_bool_2": False,
"file1_date_2": 1815319797,
"file1_cat_2": "Y",
"file1_int_3": 1512764,
"file1_float_3": 1.9787666482000001,
"file1_string_3":
"final_unique_str1_462118c4-5470-4b74-bce1-62183fa0bee1",
"file1_bool_3": False,
"created_at": "2025-06-11 16:20:58",
"source": "upload",
"id": "e0d8b00e-d538-4159-a843-7e44d9744c0c",
"recordguid__": "",
"recordstatus__": "",
"created_by__": "",
"owner__": "",
"last_updated_by__": "",
"created_at__": 0,
"last_updated_at__": 0,
"tags__": "",
"user_actions__": "",
"proposals__": "",
"link_data__": "",
"related_data__": "",
"signal__": "",
"exceptions__": "",
"remediation__": ""
}
],
[
{
"guid": "df31072c-b838-474f-a883-a9797fcf5524",
"file1_int_a": 330460,
"file1_float_b": 0.7084970022,
"file1_string_c": "string1_330460",
"file1_int_1": 731568,
"file1_float_1": 0.33039105500000004,
"file1_string_1": "unique_str1_7bd9fa0e-d7f5-4e1c-9c6d-b9cd9b544a64",
"file1_bool_1": False,
"file1_date_1": 1833308442,
"file1_cat_1": "T",
"file1_int_2": -10388,
"file1_float_2": 2.3386936906,
"file1_string_2": "changed_by_parallel_calls",
"file1_bool_2": False,
"file1_date_2": 1822519343,
"file1_cat_2": "V",
"file1_int_3": 94375,
"file1_float_3": 0.6904563792,
"file1_string_3":
"final_unique_str1_a8ec3b67-031a-4ec4-b80e-1cca33c7d09c",
"file1_bool_3": False,
"created_at": "2025-06-11 16:20:58",
"source": "upload",
"id": "00da629e-0790-462f-ba9a-44fd3db3ecc6",
"recordguid__": "",
"recordstatus__": "",
"created_by__": "",
"owner__": "",
"last_updated_by__": "",
"created_at__": 0,
"last_updated_at__": 0,
"tags__": "",
"user_actions__": "",
"proposals__": "",
"link_data__": "",
"related_data__": "",
"signal__": "",
"exceptions__": "",
"remediation__": ""
}
]
]
# Perform parallel upserts
with ThreadPoolExecutor(max_workers=4) as executor: # Adjust max_workers as
needed
futures = [executor.submit(perform_upsert, partition) for partition in
all_data]
# Wait for all futures to complete
for future in futures:
future.result() # This will raise exceptions if any occurred
```
## Environment Details
- apache/iceberg-rest-fixture: latest(a month ago)
- tabulario/spark-iceberg: latest(3 months ago)
- PyIceberg version: 0.9.1
- MinIO version: RELEASE.2025-04-22T22-12-26Z
- Storage backend: MinIO
## Additional Observations
- The issue seems related to concurrent metadata file updates
- Physical data files remain intact, suggesting only metadata is affected
- The problem becomes more likely with higher concurrency
## Request
1. Help identify the root cause of the metadata corruption
2. Suggestions for safe parallel upsert patterns in Iceberg
3. Any known issues or best practices around concurrent writes in Iceberg
4. Recovery options for the corrupted table metadata
### Willingness to contribute
- [ ] I can contribute a fix for this bug independently
- [ ] I would be willing to contribute a fix for this bug with guidance from
the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]