QlikFrederic opened a new issue, #2409:
URL: https://github.com/apache/iceberg-python/issues/2409

   ### Apache Iceberg version
   
   main (development)
   
   ### Please describe the bug 🐞
   
   When using two iceberg tables and running the maintenance task in threads, 
the commit step will try to commit the snapshot expiration to the wrong table 
resulting in an error (snapshot does not exist).
   
   Script to reproduce the issue:
   ```
   from pyiceberg.catalog.memory import InMemoryCatalog
   from datetime import datetime, timezone
   import polars as pl
   import threading
   import time
   import random
   
   def generate_df(batch_id=0):
       df = pl.DataFrame(
           {
               "event_type": ["playback"] * 1000,
               "event_origin": [f"origin{random.randint(1, 5)}"] * 1000,
               "event_send_at": [datetime.now(timezone.utc)] * 1000,
               "event_saved_at": [datetime.now(timezone.utc)] * 1000,
               "data": [
                   {
                       "calendarKey": f"calendarKey-{batch_id}",
                       "id": str(i + batch_id * 1000),
                       "referenceId": f"ref-{batch_id}-{i}",
                   }
                   for i in range(1000)
               ],
           }
       )
       return df
   
   df = generate_df()
   catalog = InMemoryCatalog("default", warehouse="/tmp/iceberg")
   catalog.create_namespace_if_not_exists("default")
   table1 = catalog.create_table_if_not_exists(
       "default.table1", schema=df.to_arrow().schema, 
location="/tmp/iceberg/table1"
   )
   
   table2 = catalog.create_table_if_not_exists(
       "default.table2", schema=df.to_arrow().schema, 
location="/tmp/iceberg/table2"
   )
   
   # Function to add multiple commits to a table
   def add_commits_to_table(table, table_name, num_commits=5):
       print(f"Adding {num_commits} commits to {table_name}")
       for i in range(num_commits):
           df_batch = generate_df(batch_id=i)
           table.append(df_batch.to_arrow())
           print(f"  Added commit {i+1} to {table_name}")
           time.sleep(0.2)  # Small delay between commits
   
   # Add multiple commits to both tables
   print("Creating multiple snapshots...")
   add_commits_to_table(table1, "table1")
   add_commits_to_table(table2, "table2")
   
   # Function to expire oldest 3 snapshots in a thread
   def expire_oldest_snapshots(table, table_name):
       try:
           # Get all snapshots
           snapshots = list(table.snapshots())
           if len(snapshots) <= 3:
               print(f"{table_name}: Not enough snapshots to expire 3 (only 
{len(snapshots)})")
               return
           
           # Find the oldest 3 snapshots
           oldest_snapshots = snapshots[:3]
           oldest_ids = [snapshot.snapshot_id for snapshot in oldest_snapshots]
           
           print(f"{table_name}: Found {len(snapshots)} snapshots, expiring 
oldest 3: {oldest_ids}")
           
           # Expire the oldest 3 snapshots by IDs
           for id in oldest_ids:
               table.maintenance.expire_snapshots().by_id(id).commit()
   
          # can also be replaced with:
          # table.maintenance.expire_snapshots().by_ids(oldest_ids).commit()
           
           print(f"{table_name}: Successfully expired snapshots {oldest_ids}")
           
       except Exception as e:
           print(f"{table_name}: Error expiring snapshots: {e}")
   
   # Run expire_snapshots in parallel threads
   print("\nRunning expire_snapshots in parallel threads...")
   thread1 = threading.Thread(target=expire_oldest_snapshots, args=(table1, 
"table1"))
   thread2 = threading.Thread(target=expire_oldest_snapshots, args=(table2, 
"table2"))
   
   thread1.start()
   thread2.start()
   
   thread1.join()
   thread2.join()
   
   print("\nDone!")
   ```
   
   
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [x] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to