This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 9189cb3  Refactor Arrow schema conversion (#117)
9189cb3 is described below

commit 9189cb3608d8a3b169351224244063c6d1670018
Author: Fokko Driesprong <fo...@tabular.io>
AuthorDate: Fri Nov 3 16:52:02 2023 +0100

    Refactor Arrow schema conversion (#117)
    
    * Refactor schema conversion
    
    We wrapped a schema in a schema.
    
    * Use UTF8 constant
    
    * Moar constants
---
 pyiceberg/avro/decoder.py    |  3 ++-
 pyiceberg/avro/encoder.py    |  3 ++-
 pyiceberg/catalog/rest.py    | 10 +++++-----
 pyiceberg/conversions.py     |  8 ++++----
 pyiceberg/io/pyarrow.py      | 13 +++++++++----
 pyiceberg/serializers.py     |  7 ++++---
 pyiceberg/typedef.py         |  2 ++
 pyiceberg/utils/config.py    |  4 ++--
 tests/catalog/test_hive.py   |  3 ++-
 tests/conftest.py            |  3 ++-
 tests/io/test_pyarrow.py     | 20 +++++++++++---------
 tests/table/test_metadata.py |  3 ++-
 tests/test_transforms.py     |  5 +++--
 tests/utils/test_config.py   |  4 ++--
 14 files changed, 52 insertions(+), 36 deletions(-)

diff --git a/pyiceberg/avro/decoder.py b/pyiceberg/avro/decoder.py
index ab78136..708392a 100644
--- a/pyiceberg/avro/decoder.py
+++ b/pyiceberg/avro/decoder.py
@@ -27,6 +27,7 @@ from typing import (
 
 from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
 from pyiceberg.io import InputStream
+from pyiceberg.typedef import UTF8
 
 
 class BinaryDecoder(ABC):
@@ -107,7 +108,7 @@ class BinaryDecoder(ABC):
         A string is encoded as a long followed by
         that many bytes of UTF-8 encoded character data.
         """
-        return self.read_bytes().decode("utf-8")
+        return self.read_bytes().decode(UTF8)
 
     def skip_boolean(self) -> None:
         self.skip(1)
diff --git a/pyiceberg/avro/encoder.py b/pyiceberg/avro/encoder.py
index 238d5d6..755627e 100644
--- a/pyiceberg/avro/encoder.py
+++ b/pyiceberg/avro/encoder.py
@@ -18,6 +18,7 @@ from uuid import UUID
 
 from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
 from pyiceberg.io import OutputStream
+from pyiceberg.typedef import UTF8
 
 
 class BinaryEncoder:
@@ -62,7 +63,7 @@ class BinaryEncoder:
 
     def write_utf8(self, s: str) -> None:
         """Encode a string as a long followed by that many bytes of UTF-8 
encoded character data."""
-        self.write_bytes(s.encode("utf-8"))
+        self.write_bytes(s.encode(UTF8))
 
     def write_uuid(self, uuid: UUID) -> None:
         """Write UUID as a fixed[16].
diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py
index 20875d3..77025b5 100644
--- a/pyiceberg/catalog/rest.py
+++ b/pyiceberg/catalog/rest.py
@@ -66,7 +66,7 @@ from pyiceberg.table import (
     TableMetadata,
 )
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
-from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel
+from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel
 
 ICEBERG_REST_SPEC_VERSION = "0.14.1"
 
@@ -110,7 +110,7 @@ SIGV4 = "rest.sigv4-enabled"
 SIGV4_REGION = "rest.signing-region"
 SIGV4_SERVICE = "rest.signing-name"
 
-NAMESPACE_SEPARATOR = b"\x1F".decode("UTF-8")
+NAMESPACE_SEPARATOR = b"\x1F".decode(UTF8)
 
 
 class TableResponse(IcebergBaseModel):
@@ -444,7 +444,7 @@ class RestCatalog(Catalog):
             write_order=sort_order,
             properties=properties,
         )
-        serialized_json = request.model_dump_json().encode("utf-8")
+        serialized_json = request.model_dump_json().encode(UTF8)
         response = self._session.post(
             self.url(Endpoints.create_table, 
namespace=namespace_and_table["namespace"]),
             data=serialized_json,
@@ -475,7 +475,7 @@ class RestCatalog(Catalog):
             name=namespace_and_table["table"],
             metadata_location=metadata_location,
         )
-        serialized_json = request.model_dump_json().encode("utf-8")
+        serialized_json = request.model_dump_json().encode(UTF8)
         response = self._session.post(
             self.url(Endpoints.register_table, 
namespace=namespace_and_table["namespace"]),
             data=serialized_json,
@@ -552,7 +552,7 @@ class RestCatalog(Catalog):
         """
         response = self._session.post(
             self.url(Endpoints.update_table, prefixed=True, 
**self._split_identifier_for_path(table_request.identifier)),
-            data=table_request.model_dump_json().encode("utf-8"),
+            data=table_request.model_dump_json().encode(UTF8),
         )
         try:
             response.raise_for_status()
diff --git a/pyiceberg/conversions.py b/pyiceberg/conversions.py
index aecd43a..f2590b1 100644
--- a/pyiceberg/conversions.py
+++ b/pyiceberg/conversions.py
@@ -39,7 +39,7 @@ from typing import (
     Union,
 )
 
-from pyiceberg.typedef import L
+from pyiceberg.typedef import UTF8, L
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
@@ -143,7 +143,7 @@ def _(_: UUIDType, value_str: str) -> uuid.UUID:
 @partition_to_py.register(BinaryType)
 @handle_none
 def _(_: PrimitiveType, value_str: str) -> bytes:
-    return bytes(value_str, "UTF-8")
+    return bytes(value_str, UTF8)
 
 
 @partition_to_py.register(DecimalType)
@@ -223,7 +223,7 @@ def _(_: DoubleType, value: float) -> bytes:
 
 @to_bytes.register(StringType)
 def _(_: StringType, value: str) -> bytes:
-    return value.encode("UTF-8")
+    return value.encode(UTF8)
 
 
 @to_bytes.register(UUIDType)
@@ -308,7 +308,7 @@ def _(_: DoubleType, b: bytes) -> float:
 
 @from_bytes.register(StringType)
 def _(_: StringType, b: bytes) -> str:
-    return bytes(b).decode("utf-8")
+    return bytes(b).decode(UTF8)
 
 
 @from_bytes.register(BinaryType)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 59bad81..75049d8 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -435,13 +435,18 @@ class PyArrowFileIO(FileIO):
             raise  # pragma: no cover - If some other kind of OSError, raise 
the raw error
 
 
-def schema_to_pyarrow(schema: Union[Schema, IcebergType]) -> pa.schema:
-    return visit(schema, _ConvertToArrowSchema())
+def schema_to_pyarrow(schema: Union[Schema, IcebergType], metadata: 
Dict[bytes, bytes] = EMPTY_DICT) -> pa.schema:
+    return visit(schema, _ConvertToArrowSchema(metadata))
 
 
-class _ConvertToArrowSchema(SchemaVisitorPerPrimitiveType[pa.DataType], 
Singleton):
+class _ConvertToArrowSchema(SchemaVisitorPerPrimitiveType[pa.DataType]):
+    _metadata: Dict[bytes, bytes]
+
+    def __init__(self, metadata: Dict[bytes, bytes] = EMPTY_DICT) -> None:
+        self._metadata = metadata
+
     def schema(self, _: Schema, struct_result: pa.StructType) -> pa.schema:
-        return pa.schema(list(struct_result))
+        return pa.schema(list(struct_result), metadata=self._metadata)
 
     def struct(self, _: StructType, field_results: List[pa.DataType]) -> 
pa.DataType:
         return pa.struct(field_results)
diff --git a/pyiceberg/serializers.py b/pyiceberg/serializers.py
index 65d4c75..6a580ea 100644
--- a/pyiceberg/serializers.py
+++ b/pyiceberg/serializers.py
@@ -23,6 +23,7 @@ from typing import Callable
 
 from pyiceberg.io import InputFile, InputStream, OutputFile
 from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil
+from pyiceberg.typedef import UTF8
 
 GZIP = "gzip"
 
@@ -76,7 +77,7 @@ class FromByteStream:
 
     @staticmethod
     def table_metadata(
-        byte_stream: InputStream, encoding: str = "utf-8", compression: 
Compressor = NOOP_COMPRESSOR
+        byte_stream: InputStream, encoding: str = UTF8, compression: 
Compressor = NOOP_COMPRESSOR
     ) -> TableMetadata:
         """Instantiate a TableMetadata object from a byte stream.
 
@@ -97,7 +98,7 @@ class FromInputFile:
     """A collection of methods that deserialize InputFiles into Iceberg 
objects."""
 
     @staticmethod
-    def table_metadata(input_file: InputFile, encoding: str = "utf-8") -> 
TableMetadata:
+    def table_metadata(input_file: InputFile, encoding: str = UTF8) -> 
TableMetadata:
         """Create a TableMetadata instance from an input file.
 
         Args:
@@ -126,6 +127,6 @@ class ToOutputFile:
             overwrite (bool): Where to overwrite the file if it already 
exists. Defaults to `False`.
         """
         with output_file.create(overwrite=overwrite) as output_stream:
-            json_bytes = metadata.model_dump_json().encode("utf-8")
+            json_bytes = metadata.model_dump_json().encode(UTF8)
             json_bytes = 
Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
             output_stream.write(json_bytes)
diff --git a/pyiceberg/typedef.py b/pyiceberg/typedef.py
index ff2a6d1..56a3d3c 100644
--- a/pyiceberg/typedef.py
+++ b/pyiceberg/typedef.py
@@ -51,6 +51,8 @@ class FrozenDict(Dict[Any, Any]):
         raise AttributeError("FrozenDict does not support .update()")
 
 
+UTF8 = 'utf-8'
+
 EMPTY_DICT = FrozenDict()
 
 K = TypeVar("K")
diff --git a/pyiceberg/utils/config.py b/pyiceberg/utils/config.py
index 7ca3382..31ba0b3 100644
--- a/pyiceberg/utils/config.py
+++ b/pyiceberg/utils/config.py
@@ -20,7 +20,7 @@ from typing import List, Optional
 
 import strictyaml
 
-from pyiceberg.typedef import FrozenDict, RecursiveDict
+from pyiceberg.typedef import UTF8, FrozenDict, RecursiveDict
 
 PYICEBERG = "pyiceberg_"
 DEFAULT = "default"
@@ -76,7 +76,7 @@ class Config:
             if directory:
                 path = os.path.join(directory, PYICEBERG_YML)
                 if os.path.isfile(path):
-                    with open(path, encoding="utf-8") as f:
+                    with open(path, encoding=UTF8) as f:
                         yml_str = f.read()
                     file_config = strictyaml.load(yml_str).data
                     file_config_lowercase = 
_lowercase_dictionary_keys(file_config)
diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py
index 9e3dd1a..c280146 100644
--- a/tests/catalog/test_hive.py
+++ b/tests/catalog/test_hive.py
@@ -58,6 +58,7 @@ from pyiceberg.table.sorting import (
     SortOrder,
 )
 from pyiceberg.transforms import BucketTransform, IdentityTransform
+from pyiceberg.typedef import UTF8
 from pyiceberg.types import (
     BooleanType,
     IntegerType,
@@ -253,7 +254,7 @@ def test_create_table(table_schema_simple: Schema, 
hive_database: HiveDatabase,
         )
     )
 
-    with open(metadata_location, encoding="utf-8") as f:
+    with open(metadata_location, encoding=UTF8) as f:
         payload = f.read()
 
     metadata = TableMetadataUtil.parse_raw(payload)
diff --git a/tests/conftest.py b/tests/conftest.py
index 79c01dc..37ca1e7 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -74,6 +74,7 @@ from pyiceberg.schema import Accessor, Schema
 from pyiceberg.serializers import ToOutputFile
 from pyiceberg.table import FileScanTask, Table
 from pyiceberg.table.metadata import TableMetadataV2
+from pyiceberg.typedef import UTF8
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
@@ -1456,7 +1457,7 @@ class 
MockHttpClientResponse(aiohttp.client_reqrep.ClientResponse):
     @property
     def raw_headers(self) -> aiohttp.typedefs.RawHeaders:
         # Return the headers encoded the way that aiobotocore expects them
-        return {k.encode("utf-8"): str(v).encode("utf-8") for k, v in 
self.response.headers.items()}.items()
+        return {k.encode(UTF8): str(v).encode(UTF8) for k, v in 
self.response.headers.items()}.items()
 
 
 def patch_aiobotocore() -> None:
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index 7174c91..4a476f2 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -56,6 +56,7 @@ from pyiceberg.expressions import (
 )
 from pyiceberg.io import InputStream, OutputStream, load_file_io
 from pyiceberg.io.pyarrow import (
+    ICEBERG_SCHEMA,
     PyArrowFile,
     PyArrowFileIO,
     _ConvertToArrowSchema,
@@ -69,6 +70,7 @@ from pyiceberg.partitioning import PartitionSpec
 from pyiceberg.schema import Schema, make_compatible_name, visit
 from pyiceberg.table import FileScanTask, Table
 from pyiceberg.table.metadata import TableMetadataV2
+from pyiceberg.typedef import UTF8
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
@@ -708,7 +710,7 @@ def _write_table_to_file(filepath: str, schema: pa.Schema, 
table: pa.Table) -> s
 
 @pytest.fixture
 def file_int(schema_int: Schema, tmpdir: str) -> str:
-    pyarrow_schema = pa.schema(schema_to_pyarrow(schema_int), 
metadata={"iceberg.schema": schema_int.model_dump_json()})
+    pyarrow_schema = schema_to_pyarrow(schema_int, metadata={ICEBERG_SCHEMA: 
bytes(schema_int.model_dump_json(), UTF8)})
     return _write_table_to_file(
         f"file:{tmpdir}/a.parquet", pyarrow_schema, 
pa.Table.from_arrays([pa.array([0, 1, 2])], schema=pyarrow_schema)
     )
@@ -716,7 +718,7 @@ def file_int(schema_int: Schema, tmpdir: str) -> str:
 
 @pytest.fixture
 def file_int_str(schema_int_str: Schema, tmpdir: str) -> str:
-    pyarrow_schema = pa.schema(schema_to_pyarrow(schema_int_str), 
metadata={"iceberg.schema": schema_int_str.model_dump_json()})
+    pyarrow_schema = schema_to_pyarrow(schema_int_str, 
metadata={ICEBERG_SCHEMA: bytes(schema_int_str.model_dump_json(), UTF8)})
     return _write_table_to_file(
         f"file:{tmpdir}/a.parquet",
         pyarrow_schema,
@@ -726,7 +728,7 @@ def file_int_str(schema_int_str: Schema, tmpdir: str) -> 
str:
 
 @pytest.fixture
 def file_string(schema_str: Schema, tmpdir: str) -> str:
-    pyarrow_schema = pa.schema(schema_to_pyarrow(schema_str), 
metadata={"iceberg.schema": schema_str.model_dump_json()})
+    pyarrow_schema = schema_to_pyarrow(schema_str, metadata={ICEBERG_SCHEMA: 
bytes(schema_str.model_dump_json(), UTF8)})
     return _write_table_to_file(
         f"file:{tmpdir}/b.parquet", pyarrow_schema, 
pa.Table.from_arrays([pa.array(["0", "1", "2"])], schema=pyarrow_schema)
     )
@@ -734,7 +736,7 @@ def file_string(schema_str: Schema, tmpdir: str) -> str:
 
 @pytest.fixture
 def file_long(schema_long: Schema, tmpdir: str) -> str:
-    pyarrow_schema = pa.schema(schema_to_pyarrow(schema_long), 
metadata={"iceberg.schema": schema_long.model_dump_json()})
+    pyarrow_schema = schema_to_pyarrow(schema_long, metadata={ICEBERG_SCHEMA: 
bytes(schema_long.model_dump_json(), UTF8)})
     return _write_table_to_file(
         f"file:{tmpdir}/c.parquet", pyarrow_schema, 
pa.Table.from_arrays([pa.array([0, 1, 2])], schema=pyarrow_schema)
     )
@@ -742,7 +744,7 @@ def file_long(schema_long: Schema, tmpdir: str) -> str:
 
 @pytest.fixture
 def file_struct(schema_struct: Schema, tmpdir: str) -> str:
-    pyarrow_schema = pa.schema(schema_to_pyarrow(schema_struct), 
metadata={"iceberg.schema": schema_struct.model_dump_json()})
+    pyarrow_schema = schema_to_pyarrow(schema_struct, 
metadata={ICEBERG_SCHEMA: bytes(schema_struct.model_dump_json(), UTF8)})
     return _write_table_to_file(
         f"file:{tmpdir}/d.parquet",
         pyarrow_schema,
@@ -759,7 +761,7 @@ def file_struct(schema_struct: Schema, tmpdir: str) -> str:
 
 @pytest.fixture
 def file_list(schema_list: Schema, tmpdir: str) -> str:
-    pyarrow_schema = pa.schema(schema_to_pyarrow(schema_list), 
metadata={"iceberg.schema": schema_list.model_dump_json()})
+    pyarrow_schema = schema_to_pyarrow(schema_list, metadata={ICEBERG_SCHEMA: 
bytes(schema_list.model_dump_json(), UTF8)})
     return _write_table_to_file(
         f"file:{tmpdir}/e.parquet",
         pyarrow_schema,
@@ -776,8 +778,8 @@ def file_list(schema_list: Schema, tmpdir: str) -> str:
 
 @pytest.fixture
 def file_list_of_structs(schema_list_of_structs: Schema, tmpdir: str) -> str:
-    pyarrow_schema = pa.schema(
-        schema_to_pyarrow(schema_list_of_structs), metadata={"iceberg.schema": 
schema_list_of_structs.model_dump_json()}
+    pyarrow_schema = schema_to_pyarrow(
+        schema_list_of_structs, metadata={ICEBERG_SCHEMA: 
bytes(schema_list_of_structs.model_dump_json(), UTF8)}
     )
     return _write_table_to_file(
         f"file:{tmpdir}/e.parquet",
@@ -795,7 +797,7 @@ def file_list_of_structs(schema_list_of_structs: Schema, 
tmpdir: str) -> str:
 
 @pytest.fixture
 def file_map(schema_map: Schema, tmpdir: str) -> str:
-    pyarrow_schema = pa.schema(schema_to_pyarrow(schema_map), 
metadata={"iceberg.schema": schema_map.model_dump_json()})
+    pyarrow_schema = schema_to_pyarrow(schema_map, metadata={ICEBERG_SCHEMA: 
bytes(schema_map.model_dump_json(), UTF8)})
     return _write_table_to_file(
         f"file:{tmpdir}/e.parquet",
         pyarrow_schema,
diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py
index 2273843..173e93e 100644
--- a/tests/table/test_metadata.py
+++ b/tests/table/test_metadata.py
@@ -39,6 +39,7 @@ from pyiceberg.table.metadata import (
 from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
 from pyiceberg.table.sorting import NullOrder, SortDirection, SortField
 from pyiceberg.transforms import IdentityTransform
+from pyiceberg.typedef import UTF8
 from pyiceberg.types import (
     BooleanType,
     FloatType,
@@ -99,7 +100,7 @@ def test_from_dict_v2_parse_raw(example_table_metadata_v2: 
Dict[str, Any]) -> No
 
 def test_from_byte_stream(example_table_metadata_v2: Dict[str, Any]) -> None:
     """Test generating a TableMetadata instance from a file-like byte stream"""
-    data = bytes(json.dumps(example_table_metadata_v2), encoding="utf-8")
+    data = bytes(json.dumps(example_table_metadata_v2), encoding=UTF8)
     byte_stream = io.BytesIO(data)
     FromByteStream.table_metadata(byte_stream=byte_stream)
 
diff --git a/tests/test_transforms.py b/tests/test_transforms.py
index bcc9784..8bec844 100644
--- a/tests/test_transforms.py
+++ b/tests/test_transforms.py
@@ -74,6 +74,7 @@ from pyiceberg.transforms import (
     YearTransform,
     parse_transform,
 )
+from pyiceberg.typedef import UTF8
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
@@ -187,7 +188,7 @@ def test_bucket_method(type_var: PrimitiveType) -> None:
 
 def test_string_with_surrogate_pair() -> None:
     string_with_surrogate_pair = "string with a surrogate pair: 💰"
-    as_bytes = bytes(string_with_surrogate_pair, "UTF-8")
+    as_bytes = bytes(string_with_surrogate_pair, UTF8)
     bucket_transform = BucketTransform(100).transform(StringType(), 
bucket=False)
     assert bucket_transform(string_with_surrogate_pair) == mmh3.hash(as_bytes)
 
@@ -392,7 +393,7 @@ def test_truncate_string(input_var: str, expected: str) -> 
None:
     "type_var,value,expected_human_str,expected",
     [
         (BinaryType(), b"\x00\x01\x02\x03", "AAECAw==", b"\x00"),
-        (BinaryType(), bytes("\u2603de", "utf-8"), "4piDZGU=", b"\xe2"),
+        (BinaryType(), bytes("\u2603de", UTF8), "4piDZGU=", b"\xe2"),
         (DecimalType(8, 5), Decimal("14.21"), "14.21", Decimal("14.21")),
         (IntegerType(), 123, "123", 123),
         (LongType(), 123, "123", 123),
diff --git a/tests/utils/test_config.py b/tests/utils/test_config.py
index 11c3076..d694e15 100644
--- a/tests/utils/test_config.py
+++ b/tests/utils/test_config.py
@@ -20,7 +20,7 @@ from unittest import mock
 import pytest
 from strictyaml import as_document
 
-from pyiceberg.typedef import RecursiveDict
+from pyiceberg.typedef import UTF8, RecursiveDict
 from pyiceberg.utils.config import Config, _lowercase_dictionary_keys, 
merge_config
 
 EXAMPLE_ENV = {"PYICEBERG_CATALOG__PRODUCTION__URI": "https://service.io/api"}
@@ -43,7 +43,7 @@ def test_from_environment_variables_uppercase() -> None:
 
 def test_from_configuration_files(tmp_path_factory: pytest.TempPathFactory) -> 
None:
     config_path = str(tmp_path_factory.mktemp("config"))
-    with open(f"{config_path}/.pyiceberg.yaml", "w", encoding="utf-8") as file:
+    with open(f"{config_path}/.pyiceberg.yaml", "w", encoding=UTF8) as file:
         yaml_str = as_document({"catalog": {"production": {"uri": 
"https://service.io/api"}}}).as_yaml()
         file.write(yaml_str)
 

Reply via email to