bigluck opened a new issue, #542:
URL: https://github.com/apache/iceberg-python/issues/542

   ### Apache Iceberg version
   
   0.6.0 (latest release)
   
   ### Please describe the bug 🐞
   
   I'm facing a race condition when doing `table.scan` on my code. For some 
strange reason, the code exits before getting the final table.
   
   This is my code:
   ```python
   from pyiceberg.table import StaticTable
   from pyiceberg.expressions import AlwaysTrue
   
   
   table = StaticTable.from_metadata(
       
metadata_location='s3a://my_s3_bucket/iceberg/taxi_fhvhv_7eba066f-7498-4a8a-b932-bf2dbcc938fd/meta>
   )
   res = table.scan(
       selected_fields=("*",),
       row_filter=AlwaysTrue(),
       limit=10_000,
   )
   
   print('A >>', res)
   
   a_res = res.to_arrow()
   print('B >>', a_res)
   print('C >>', a_res.num_rows)
   ```
   
   Which returns:
   ```
   root@45daaeef1ce1:/# python test.py
   A >> <pyiceberg.table.DataScan object at 0x7fece38ec6d0>
   B >> pyarrow.Table
   hvfhs_license_num: string
   dispatching_base_num: string
   originating_base_num: string
   request_datetime: timestamp[us, tz=UTC]
   on_scene_datetime: timestamp[us, tz=UTC]
   pickup_datetime: timestamp[us, tz=UTC]
   dropoff_datetime: timestamp[us, tz=UTC]
   PULocationID: int64
   DOLocationID: int64
   trip_miles: double
   trip_time: int64
   base_passenger_fare: double
   tolls: double
   bcf: double
   sales_tax: double
   congestion_surcharge: double
   airport_fee: int32
   tips: double
   driver_pay: double
   shared_request_flag: string
   shared_match_flag: string
   access_a_ride_flag: string
   wav_request_flag: string
   wav_match_flag: string
   ----
   hvfhs_license_num: []
   dispatching_base_num: []
   originating_base_num: []
   request_datetime: []
   on_scene_datetime: []
   pickup_datetime: []
   dropoff_datetime: []
   PULocationID: []
   DOLocationID: []
   trip_miles: []
   ...
   C >> 0
   ```
   
   I think the problem happens here: 
   
https://github.com/apache/iceberg-python/blob/6989b92c2d449beb9fe4817c64f619ea5bfc81dc/pyiceberg/io/pyarrow.py#L1021-L1023
   
   The code modifies the `row_counts` array before returning the table, but if 
multiple tasks are running concurrently, the next task that starts executing 
the `_task_to_table` function will return `None` due to 
   
   
https://github.com/apache/iceberg-python/blob/6989b92c2d449beb9fe4817c64f619ea5bfc81dc/pyiceberg/io/pyarrow.py#L941-L954
   
   I think it happens because the original task with the data is still 
processing the content of the table here: 
https://github.com/apache/iceberg-python/blob/6989b92c2d449beb9fe4817c64f619ea5bfc81dc/pyiceberg/io/pyarrow.py#L1023
   
   So now, I suppose, what happens is that the task that returned `None` is 
processed before the real task with the table content, indeed the 
`completed_futures` list now contains only a task with `None`, witch course the 
code to return an empty table:
   
   
https://github.com/apache/iceberg-python/blob/6989b92c2d449beb9fe4817c64f619ea5bfc81dc/pyiceberg/io/pyarrow.py#L1111C1-L1116C18
   
   
   This is the content of the `completed_futures` & `tables` variables on the 
`project_table` fn:
   ```
   >>>>> completed_futures SortedKeyList([<Future at 0x7fa7e4b97fd0 
state=finished returned NoneType>], key=<function 
project_table.<locals>.<lambda> at 0x7fa90316c4a0>)
   >>>>> tables []
   ```
   
   And by modifying the loop with:
   ```python
       # for consistent ordering, we need to maintain future order
       futures_index = {f: i for i, f in enumerate(futures)}
       completed_futures: SortedList[Future[pa.Table]] = 
SortedList(iterable=[], key=lambda f: futures_index[f])
       for future in concurrent.futures.as_completed(futures):
           completed_futures.add(future)
   
           # stop early if limit is satisfied
           if limit is not None and sum(row_counts) >= limit:
               print('>>>>> ', limit, sum(row_counts), future.result())
               break
   ```
   
   I got:
   ```
   >>>>>  10000 10000 None
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to