QlikFrederic opened a new issue, #2409: URL: https://github.com/apache/iceberg-python/issues/2409
### Apache Iceberg version main (development) ### Please describe the bug 🐞 When using two iceberg tables and running the maintenance task in threads, the commit step will try to commit the snapshot expiration to the wrong table resulting in an error (snapshot does not exist). Script to reproduce the issue: ``` from pyiceberg.catalog.memory import InMemoryCatalog from datetime import datetime, timezone import polars as pl import threading import time import random def generate_df(batch_id=0): df = pl.DataFrame( { "event_type": ["playback"] * 1000, "event_origin": [f"origin{random.randint(1, 5)}"] * 1000, "event_send_at": [datetime.now(timezone.utc)] * 1000, "event_saved_at": [datetime.now(timezone.utc)] * 1000, "data": [ { "calendarKey": f"calendarKey-{batch_id}", "id": str(i + batch_id * 1000), "referenceId": f"ref-{batch_id}-{i}", } for i in range(1000) ], } ) return df df = generate_df() catalog = InMemoryCatalog("default", warehouse="/tmp/iceberg") catalog.create_namespace_if_not_exists("default") table1 = catalog.create_table_if_not_exists( "default.table1", schema=df.to_arrow().schema, location="/tmp/iceberg/table1" ) table2 = catalog.create_table_if_not_exists( "default.table2", schema=df.to_arrow().schema, location="/tmp/iceberg/table2" ) # Function to add multiple commits to a table def add_commits_to_table(table, table_name, num_commits=5): print(f"Adding {num_commits} commits to {table_name}") for i in range(num_commits): df_batch = generate_df(batch_id=i) table.append(df_batch.to_arrow()) print(f" Added commit {i+1} to {table_name}") time.sleep(0.2) # Small delay between commits # Add multiple commits to both tables print("Creating multiple snapshots...") add_commits_to_table(table1, "table1") add_commits_to_table(table2, "table2") # Function to expire oldest 3 snapshots in a thread def expire_oldest_snapshots(table, table_name): try: # Get all snapshots snapshots = list(table.snapshots()) if len(snapshots) <= 3: print(f"{table_name}: Not enough snapshots to expire 3 (only {len(snapshots)})") return # Find the oldest 3 snapshots oldest_snapshots = snapshots[:3] oldest_ids = [snapshot.snapshot_id for snapshot in oldest_snapshots] print(f"{table_name}: Found {len(snapshots)} snapshots, expiring oldest 3: {oldest_ids}") # Expire the oldest 3 snapshots by IDs for id in oldest_ids: table.maintenance.expire_snapshots().by_id(id).commit() # can also be replaced with: # table.maintenance.expire_snapshots().by_ids(oldest_ids).commit() print(f"{table_name}: Successfully expired snapshots {oldest_ids}") except Exception as e: print(f"{table_name}: Error expiring snapshots: {e}") # Run expire_snapshots in parallel threads print("\nRunning expire_snapshots in parallel threads...") thread1 = threading.Thread(target=expire_oldest_snapshots, args=(table1, "table1")) thread2 = threading.Thread(target=expire_oldest_snapshots, args=(table2, "table2")) thread1.start() thread2.start() thread1.join() thread2.join() print("\nDone!") ``` ### Willingness to contribute - [ ] I can contribute a fix for this bug independently - [x] I would be willing to contribute a fix for this bug with guidance from the Iceberg community - [ ] I cannot contribute a fix for this bug at this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org