kevinjqliu opened a new issue, #1506:
URL: https://github.com/apache/iceberg-python/issues/1506
### Apache Iceberg version
None
### Please describe the bug 🐞
From slack,
"""
Hi team, There have been occasional reports from internal users that the
number of records retrieved when loading data with PyIceberg sometimes differs.
Since it is difficult to determine the specific circumstances under which this
occurs, reproducing the issue has been challenging. What kind of logging should
be implemented to help identify the root cause when the issue arises?
"""
```
Git commit: a051584a3684392d2db6556449eb299145d47d15 (pyiceberg-0.8.1 tag)
def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
"""Scan the Iceberg table and return a pa.Table.
Returns a pa.Table with data from the Iceberg table by resolving the
right columns that match the current table schema. Only data that
matches the provided row_filter expression is returned.
Args:
tasks: FileScanTasks representing the data files and delete
files to read from.
Returns:
A PyArrow table. Total number of rows will be capped if
specified.
Raises:
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to
the schema type
"""
deletes_per_file = _read_all_delete_files(self._fs, tasks)
executor = ExecutorFactory.get_or_create()
def _table_from_scan_task(task: FileScanTask) -> pa.Table:
batches =
list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
if len(batches) > 0:
return pa.Table.from_batches(batches)
else:
return None
futures = [
executor.submit(
_table_from_scan_task,
task,
)
for task in tasks
]
logger.info(f"Number of tasks: {len(tasks)} Number of Futures:
{len(futures)}")
total_row_count = 0
# for consistent ordering, we need to maintain future order
futures_index = {f: i for i, f in enumerate(futures)}
completed_futures: SortedList[Future[pa.Table]] =
SortedList(iterable=[], key=lambda f: futures_index[f])
for future in concurrent.futures.as_completed(futures):
completed_futures.add(future)
if table_result := future.result():
total_row_count += len(table_result)
# stop early if limit is satisfied
if self._limit is not None and total_row_count >= self._limit:
break
# by now, we've either completed all tasks or satisfied the limit
if self._limit is not None:
_ = [f.cancel() for f in futures if not f.done()]
tables = [f.result() for f in completed_futures if f.result()]
if len(tables) < 1:
return pa.Table.from_batches([],
schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))
result = pa.concat_tables(tables, promote_options="permissive")
if self._limit is not None:
return result.slice(0, self._limit)
logger.info(f"total_row_count: {total_row_count}, len(tables):
{len(tables)} len(completed_futures): {len(completed_futures)}")
logger.info([(i, t.num_rows) for i, t in enumerate(tables)])
logger.info([(i, t.file.file_path) for i, t in enumerate(tasks)])
return result
Is the tasks variable in the to_table() function of the ArrowScan class
non-deterministic?
While debugging, I observed that applying the same row_filter to the same
table sometimes results in a different number of tasks. In cases where data
loss occurs, I noticed that the number of table_result objects retrieved via
multiprocessing varies.
total_row_count must be 100000, but at times it is between 97000 and 100000
2025-01-10 18:34:26 - pyiceberg.io.pyarrow - INFO - log init
2025-01-10 18:34:26 - pyiceberg.io - INFO - Loaded FileIO:
pyiceberg.io.pyarrow.PyArrowFileIO
2025-01-10 18:36:02 - pyiceberg.io.pyarrow - INFO - total_row_count: 100000,
len(completed_futures): 114
2025-01-10 18:36:02 - pyiceberg.io.pyarrow - INFO - [(0, 186), (1, 303), (2,
861), (3, 832), (4, 849), (5, 922), (6, 933), (7, 911), (8, 903), (9, 859),
(10, 882), (11, 898), (12, 890), (13, 809), (14, 855), (15, 908), (16, 929),
(17, 852), (18, 898), (19, 857), (20, 935), (21, 840), (22, 849), (23, 853),
(24, 893), (25, 929), (26, 877), (27, 905), (28, 917), (29, 904), (30, 868),
(31, 883), (32, 857), (33, 896), (34, 905), (35, 899), (36, 885), (37, 878),
(38, 855), (39, 921), (40, 891), (41, 933), (42, 830), (43, 857), (44, 887),
(45, 909), (46, 883), (47, 903), (48, 860), (49, 907), (50, 878), (51, 937),
(52, 870), (53, 884), (54, 877), (55, 926), (56, 891), (57, 861), (58, 920),
(59, 922), (60, 880), (61, 892), (62, 922), (63, 885), (64, 854), (65, 882),
(66, 892), (67, 870), (68, 881), (69, 876), (70, 844), (71, 858), (72, 921),
(73, 948), (74, 898), (75, 910), (76, 852), (77, 971), (78, 899), (79, 887),
(80, 915), (81, 874), (82, 906), (83, 930), (84, 874), (85, 872), (86, 8
71), (87, 877), (88, 891), (89, 820), (90, 877), (91, 876), (92, 928), (93,
859), (94, 878), (95, 884), (96, 954), (97, 856), (98, 924), (99, 859), (100,
914), (101, 892), (102, 889), (103, 886), (104, 882), (105, 915), (106, 862),
(107, 907), (108, 886), (109, 837), (110, 910), (111, 963), (112, 926), (113,
872)] # tables row index and count
2025-01-10 18:38:32 - pyiceberg.io.pyarrow - INFO - total_row_count: 99126,
len(completed_futures): 116
2025-01-10 18:38:32 - pyiceberg.io.pyarrow - INFO - [(0, 186), (1, 303), (2,
861), (3, 832), (4, 849), (5, 922), (6, 933), (7, 911), (8, 903), (9, 859),
(10, 882), (11, 898), (12, 890), (13, 809), (14, 855), (15, 908), (16, 929),
(17, 852), (18, 898), (19, 857), (20, 935), (21, 840), (22, 849), (23, 853),
(24, 893), (25, 929), (26, 877), (27, 905), (28, 917), (29, 904), (30, 868),
(31, 883), (32, 857), (33, 896), (34, 905), (35, 899), (36, 885), (37, 878),
(38, 855), (39, 921), (40, 891), (41, 933), (42, 830), (43, 857), (44, 887),
(45, 909), (46, 883), (47, 903), (48, 860), (49, 907), (50, 878), (51, 937),
(52, 870), (53, 884), (54, 877), (55, 926), (56, 891), (57, 861), (58, 920),
(59, 922), (60, 880), (61, 892), (62, 922), (63, 885), (64, 854), (65, 882),
(66, 892), (67, 870), (68, 881), (69, 876), (70, 844), (71, 858), (72, 921),
(73, 948), (74, 898), (75, 910), (76, 852), (77, 971), (78, 899), (79, 887),
(80, 915), (81, 906), (82, 930), (83, 874), (84, 872), (85, 871), (86, 8
77), (87, 891), (88, 820), (89, 877), (90, 876), (91, 928), (92, 859), (93,
878), (94, 884), (95, 954), (96, 856), (97, 924), (98, 859), (99, 914), (100,
892), (101, 889), (102, 886), (103, 882), (104, 915), (105, 862), (106, 907),
(107, 886), (108, 837), (109, 910), (110, 963), (111, 926), (112, 872)] #
tables row index and count
2025-01-10 19:12:18 - pyiceberg.io.pyarrow - INFO - log init
2025-01-10 19:12:18 - pyiceberg.io - INFO - Loaded FileIO:
pyiceberg.io.pyarrow.PyArrowFileIO
2025-01-10 19:14:36 - pyiceberg.io.pyarrow - INFO - Number of tasks: 115
Number of Futures: 115
2025-01-10 19:16:26 - pyiceberg.io.pyarrow - INFO - total_row_count: 100000,
len(completed_futures): 115
2025-01-10 19:16:26 - pyiceberg.io.pyarrow - INFO - [(0, 186), (1, 303), (2,
861), (3, 832), (4, 849), (5, 922), (6, 933), (7, 911), (8, 903), (9, 859),
(10, 882), (11, 898), (12, 890), (13, 809), (14, 855), (15, 908), (16, 929),
(17, 852), (18, 898), (19, 857), (20, 935), (21, 840), (22, 849), (23, 853),
(24, 893), (25, 929), (26, 877), (27, 905), (28, 917), (29, 904), (30, 868),
(31, 883), (32, 857), (33, 896), (34, 905), (35, 899), (36, 885), (37, 878),
(38, 855), (39, 921), (40, 891), (41, 933), (42, 830), (43, 857), (44, 887),
(45, 909), (46, 883), (47, 903), (48, 860), (49, 907), (50, 878), (51, 937),
(52, 870), (53, 884), (54, 877), (55, 926), (56, 891), (57, 861), (58, 920),
(59, 922), (60, 880), (61, 892), (62, 922), (63, 885), (64, 854), (65, 882),
(66, 892), (67, 870), (68, 881), (69, 876), (70, 844), (71, 858), (72, 921),
(73, 948), (74, 898), (75, 910), (76, 852), (77, 971), (78, 899), (79, 887),
(80, 915), (81, 874), (82, 906), (83, 930), (84, 874), (85, 872), (86, 8
71), (87, 877), (88, 891), (89, 820), (90, 877), (91, 876), (92, 928), (93,
859), (94, 878), (95, 884), (96, 954), (97, 856), (98, 924), (99, 859), (100,
914), (101, 892), (102, 889), (103, 886), (104, 882), (105, 915), (106, 862),
(107, 907), (108, 886), (109, 837), (110, 910), (111, 963), (112, 926), (113,
872)] # tables row index and count
```
### Willingness to contribute
- [ ] I can contribute a fix for this bug independently
- [ ] I would be willing to contribute a fix for this bug with guidance from
the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]