JonasJ-ap opened a new pull request, #6997:
URL: https://github.com/apache/iceberg/pull/6997
## Problem Addressed:
This PR fixes #6505, #6647 .
It adds support to infer iceberg schema from parquet schema when the parquet
file does not contain metadata holding the encoded iceberg schema.
This PR is still under development. I put it here to receive some initial
feedback about the overall design and proper structure. Thank you in advance
for your help.
## Tests:
Working on unit tests.
Sample on AWS Athena (reproduce and fix the bug following the procedures in
#6505 ):
1. Create a table on AWS Glue:
```scala
val type_frame = spark
.range(0, 5, 1, 5)
.withColumnRenamed("id", "longCol")
.withColumn("intCol", expr("CAST(longCol AS INT)"))
.withColumn("floatCol", expr("CAST(longCol AS
FLOAT)"))
.withColumn("doubleCol", expr("CAST(longCol AS
DOUBLE)"))
.withColumn("dateCol", date_add(current_date(), 1))
.withColumn("timestampCol",
expr("TO_TIMESTAMP(dateCol)"))
.withColumn("stringCol", expr("CAST(dateCol AS
STRING)"))
.withColumn("booleanCol", expr("longCol > 5"))
.withColumn("binaryCol", expr("CAST(longCol AS
BINARY)"))
.withColumn("byteCol", expr("CAST(longCol AS BYTE)"))
.withColumn("decimalCol", expr("CAST(longCol AS
DECIMAL(10, 2))"))
.withColumn("shortCol", expr("CAST(longCol AS
SHORT)"))
.withColumn("mapCol", expr("MAP(longCol,
decimalCol)"))
.withColumn("arrayCol", expr("ARRAY(longCol)"))
.withColumn("structCol", expr("STRUCT(mapCol,
arrayCol)"))
type_frame.writeTo(s"demo.$DB_NAME.type_test_ref_unpartitioned2").tableProperty("format-version",
"2").createOrReplace()
```
2. Use AWS Athena to optimize the table
```bash
OPTIMIZE type_test_ref_unpartitioned REWRITE DATA USING BIN_PACK;
```
3. Run code snippet leads to error:
```python
from pyiceberg.catalog import load_catalog
catalog = load_catalog("default", warehouse="s3://gluetestjonas/warehouse")
table = catalog.load_table("iceberg_ref.type_test_ref_unpartitioned")
df = table.scan().to_arrow()
print(df)
```
On current master branch:
```bash
issue_6505 python3.10 parquet_schema.py
Traceback (most recent call last):
File
"/Users/jonasjiang/Workspace/Apache_Iceberg_ws/python_test/issue_6505/parquet_schema.py",
line 6, in <module>
df = table.scan().to_arrow()
File
"/Users/jonasjiang/Workspace/Apache_Iceberg_ws/iceberg_python/python/pyiceberg/table/__init__.py",
line 404, in to_arrow
return project_table(
File
"/Users/jonasjiang/Workspace/Apache_Iceberg_ws/iceberg_python/python/pyiceberg/io/pyarrow.py",
line 777, in project_table
for table in pool.starmap(
File
"/opt/homebrew/Cellar/[email protected]/3.10.10_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py",
line 375, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File
"/opt/homebrew/Cellar/[email protected]/3.10.10_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py",
line 774, in get
raise self._value
File
"/opt/homebrew/Cellar/[email protected]/3.10.10_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py",
line 125, in worker
result = (True, func(*args, **kwds))
File
"/opt/homebrew/Cellar/[email protected]/3.10.10_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py",
line 51, in starmapstar
return list(itertools.starmap(args[0], args[1]))
File
"/Users/jonasjiang/Workspace/Apache_Iceberg_ws/iceberg_python/python/pyiceberg/io/pyarrow.py",
line 714, in _file_to_table
raise ValueError(
ValueError: Iceberg schema is not embedded into the Parquet file, see
https://github.com/apache/iceberg/issues/6505
```
On this PR:
```bash
issue_6505 python3.10 parquet_schema.py
pyarrow.Table
longCol: int64
intCol: int32
floatCol: float
doubleCol: double
dateCol: date32[day]
timestampCol: timestamp[us, tz=UTC]
stringCol: string
booleanCol: bool
binaryCol: binary
byteCol: int32
decimalCol: decimal128(10, 2)
shortCol: int32
mapCol: map<int64, decimal128(10, 2)>
child 0, entries: struct<key: int64 not null, value: decimal128(10, 2)>
not null
child 0, key: int64 not null
child 1, value: decimal128(10, 2)
arrayCol: list<item: int64>
child 0, item: int64
structCol: struct<mapCol: map<int64, decimal128(10, 2)>, arrayCol:
list<item: int64>>
child 0, mapCol: map<int64, decimal128(10, 2)>
child 0, entries: struct<key: int64 not null, value: decimal128(10,
2)> not null
child 0, key: int64 not null
child 1, value: decimal128(10, 2)
child 1, arrayCol: list<item: int64>
child 0, item: int64
----
longCol: [[3,0,2,1,4]]
intCol: [[3,0,2,1,4]]
floatCol: [[3,0,2,1,4]]
doubleCol: [[3,0,2,1,4]]
dateCol: [[2023-03-04,2023-03-04,2023-03-04,2023-03-04,2023-03-04]]
timestampCol: [[2023-03-04 00:00:00.000000,2023-03-04
00:00:00.000000,2023-03-04 00:00:00.000000,2023-03-04
00:00:00.000000,2023-03-04 00:00:00.000000]]
stringCol:
[["2023-03-04","2023-03-04","2023-03-04","2023-03-04","2023-03-04"]]
booleanCol: [[false,false,false,false,false]]
binaryCol:
[[0000000000000003,0000000000000000,0000000000000002,0000000000000001,0000000000000004]]
byteCol: [[3,0,2,1,4]]
```
Indicating now the table can be read normally
`.pyiceberg.yaml`:
```yaml
catalog:
default:
type: glue
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]