TomAugspurger commented on issue #5800:
URL: https://github.com/apache/iceberg/issues/5800#issuecomment-1443857629
Here's a rough version of a `DataScane` to Dask DataFrame.
```python
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual
def _file_to_pandas(fs, task, bound_row_filter, projected_schema,
projected_field_ids, case_sensitive):
from pyiceberg.io.pyarrow import _file_to_table
return _file_to_table(fs, task, bound_row_filter, projected_schema,
projected_field_ids, case_sensitive).to_pandas()
def to_dask_dataframe(sc):
"""Convert a DataScane to a Dask DataFrame"""
from pyiceberg.io.pyarrow import (
PyArrowFileIO, bind, extract_field_ids, schema_to_pyarrow, MapType,
ListType,
)
import pyarrow as pa
import dask
import dask.dataframe as dd
# arguments
tasks = list(sc.plan_files())
table = sc.table
row_filter = sc.row_filter
projected_schema = sc.projection()
case_sensitive = sc.case_sensitive
# stuff stolen from to_arrow()
if isinstance(table.io, PyArrowFileIO):
scheme, _ = PyArrowFileIO.parse_location(table.location())
fs = table.io.get_fs(scheme)
else:
raise ValueError(f"Expected PyArrowFileIO, got: {table.io}")
bound_row_filter = bind(table.schema(), row_filter,
case_sensitive=case_sensitive)
projected_field_ids = {
id for id in projected_schema.field_ids if not
isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))
# build the Dask DataFrame
schema = schema_to_pyarrow(projected_schema)
names = [x.name for x in projected_schema.fields]
meta = pa.table([[]] * len(schema.names), schema=schema).to_pandas()
# TODO: ensure deterministic
token = dask.base.tokenize(fs, bound_row_filter, projected_schema,
projected_field_ids, case_sensitive)
name = f'from-iceberg-{token}'
dsk = {
(name, i): (
_file_to_pandas, fs, task, bound_row_filter, projected_schema,
projected_field_ids, case_sensitive
)
for i, task in enumerate(tasks)
}
divisions = [None] * len(dsk)
df = dd.DataFrame(dsk, name, meta, divisions)
return df
```
It seems to work, but I haven't tested it beyond a basic `df.head()`. A
couple notes:
1. This returns a Dask DataFrame with a single Dask partition per "scan
file" in `.scan_files()`. Which is maybe equal to the number of parquet files?
2. Currently, `divisions` is set to None, which is sub-optimal. (xref
https://docs.dask.org/en/stable/dataframe-design.html?highlight=divisions#partitions,
https://docs.dask.org/en/stable/dataframe-parquet.html?highlight=divisions#calculating-divisions).
I'm seeing some stuff in `DataFile` that might help with setting those
properly.
3. I've never used (py)iceberg before, but I was pleasantly surprised with
how straightforward this was. This is copy-pasting bits and pieces out of
`pyiceberg.io.pyarrow`. With a proper refactor, this is even fewer net new
lines of code.
4. On the Dask side, we would be interested in making this more
sophisticated, to allow operations on the Dask DataFrame to affect the original
scan (https://github.com/dask/dask/issues/9970 and linked issues)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]