chengchengpei commented on issue #1305:
URL:
https://github.com/apache/iceberg-python/issues/1305#issuecomment-2465678139
i tried to run the following codes: (multiple processes to `append` to the
same iceberg table)
```
import os
import time
from multiprocessing import Pool
import pyarrow as pa
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, BinaryType
import base64
import boto3
from utils import list_files
def process_batch(batch):
print('start processing batch')
ids = []
image_data = []
for image_path, image_name in batch:
with open(image_path, "rb") as f:
image_data.append(base64.b64encode(f.read()))
ids.append(image_name)
table_data = pa.Table.from_pydict({"id": ids, "image_data": image_data})
start = time.time()
catalog = load_catalog("glue", **{
"type": "glue",
"region": "us-east-1",
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.access-key-id": os.getenv("AWS_KEY_ID"),
"s3.secret-access-key": os.getenv("AWS_SECRET_ACCESS_KEY"),
"max-workers": 8
})
catalog.load_table("test.imagenet-object-localization-challenge-10000").append(table_data)
end = time.time()
print('uploaded {} in {} seconds'.format(len(ids), end - start))
return len(ids), end - start
if __name__ == "__main__":
# Create a schema for the Iceberg table
schema = Schema(
NestedField(1, "id", StringType()),
NestedField(2, "image_data", BinaryType())
)
# Load the Iceberg catalog
catalog = load_catalog("glue", **{
"type": "glue",
"region": "us-east-1",
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.access-key-id": os.getenv("AWS_KEY_ID"),
"s3.secret-access-key": os.getenv("AWS_SECRET_ACCESS_KEY"),
"max-workers": 8
# "write.parquet.compression-codec": "snappy"
})
catalog.create_namespace_if_not_exists("test")
# Create an Iceberg table
table = catalog.create_table_if_not_exists(
identifier="test.imagenet-object-localization-challenge-10000",
schema=schema,
location="s3://test/iceberg-data/")
# Load images and convert to base64
images_list = list_files("/Users/ILSVRC/data/CLS-LOC/test/",
extension=".JPEG")
batch_size = 10000
total_batches = 10
processes = []
batches = [images_list[i:i + batch_size] for i in
range(0, min(len(images_list), total_batches * batch_size),
batch_size)]
with Pool(4) as pool:
results = pool.map(process_batch, batches)
for result in results:
print('uploaded {} in {} seconds'.format(result[0], result[1]))
```
but got
```
Traceback (most recent call last):
File
"/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py",
line 125, in worker
result = (True, func(*args, **kwds))
File
"/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py",
line 48, in mapstar
return list(map(*args))
File "/Users/test/write_images_to_iceberg.py", line 51, in process_batch
catalog.load_table("test.imagenet-object-localization-challenge-10000").append(table_data)
File
"/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py",
line 1578, in append
tx.append(df=df, snapshot_properties=snapshot_properties)
File
"/Users/tst/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py", line
289, in __exit__
self.commit_transaction()
File
"/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py",
line 712, in commit_transaction
self._table._do_commit( # pylint: disable=W0212
File
"/Users/test/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py",
line 1638, in _do_commit
response = self.catalog._commit_table( # pylint: disable=W0212
File
"/Users/test/venv/lib/python3.9/site-packages/pyiceberg/catalog/glue.py", line
484, in _commit_table
updated_staged_table = self._update_and_stage_table(current_table,
table_request)
File
"/Users/test/venv/lib/python3.9/site-packages/pyiceberg/catalog/__init__.py",
line 835, in _update_and_stage_table
requirement.validate(current_table.metadata if current_table else None)
File
"/Users/tests/venv/lib/python3.9/site-packages/pyiceberg/table/__init__.py",
line 1262, in validate
raise CommitFailedException(
pyiceberg.exceptions.CommitFailedException: Requirement failed: branch main
has changed: expected id 2633742078255924117, found 3998254648540280684
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/test/write_images_to_iceberg.py", line 92, in <module>
results = pool.map(process_batch, batches)
File
"/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py",
line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File
"/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py",
line 771, in get
raise self._value
pyiceberg.exceptions.CommitFailedException: Requirement failed: branch main
has changed: expected id 2633742078255924117, found 3998254648540280684
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]