This is an automated email from the ASF dual-hosted git repository. fokko pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push: new 9189cb3 Refactor Arrow schema conversion (#117) 9189cb3 is described below commit 9189cb3608d8a3b169351224244063c6d1670018 Author: Fokko Driesprong <fo...@tabular.io> AuthorDate: Fri Nov 3 16:52:02 2023 +0100 Refactor Arrow schema conversion (#117) * Refactor schema conversion We wrapped a schema in a schema. * Use UTF8 constant * Moar constants --- pyiceberg/avro/decoder.py | 3 ++- pyiceberg/avro/encoder.py | 3 ++- pyiceberg/catalog/rest.py | 10 +++++----- pyiceberg/conversions.py | 8 ++++---- pyiceberg/io/pyarrow.py | 13 +++++++++---- pyiceberg/serializers.py | 7 ++++--- pyiceberg/typedef.py | 2 ++ pyiceberg/utils/config.py | 4 ++-- tests/catalog/test_hive.py | 3 ++- tests/conftest.py | 3 ++- tests/io/test_pyarrow.py | 20 +++++++++++--------- tests/table/test_metadata.py | 3 ++- tests/test_transforms.py | 5 +++-- tests/utils/test_config.py | 4 ++-- 14 files changed, 52 insertions(+), 36 deletions(-) diff --git a/pyiceberg/avro/decoder.py b/pyiceberg/avro/decoder.py index ab78136..708392a 100644 --- a/pyiceberg/avro/decoder.py +++ b/pyiceberg/avro/decoder.py @@ -27,6 +27,7 @@ from typing import ( from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT from pyiceberg.io import InputStream +from pyiceberg.typedef import UTF8 class BinaryDecoder(ABC): @@ -107,7 +108,7 @@ class BinaryDecoder(ABC): A string is encoded as a long followed by that many bytes of UTF-8 encoded character data. """ - return self.read_bytes().decode("utf-8") + return self.read_bytes().decode(UTF8) def skip_boolean(self) -> None: self.skip(1) diff --git a/pyiceberg/avro/encoder.py b/pyiceberg/avro/encoder.py index 238d5d6..755627e 100644 --- a/pyiceberg/avro/encoder.py +++ b/pyiceberg/avro/encoder.py @@ -18,6 +18,7 @@ from uuid import UUID from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT from pyiceberg.io import OutputStream +from pyiceberg.typedef import UTF8 class BinaryEncoder: @@ -62,7 +63,7 @@ class BinaryEncoder: def write_utf8(self, s: str) -> None: """Encode a string as a long followed by that many bytes of UTF-8 encoded character data.""" - self.write_bytes(s.encode("utf-8")) + self.write_bytes(s.encode(UTF8)) def write_uuid(self, uuid: UUID) -> None: """Write UUID as a fixed[16]. diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index 20875d3..77025b5 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -66,7 +66,7 @@ from pyiceberg.table import ( TableMetadata, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder -from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel +from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel ICEBERG_REST_SPEC_VERSION = "0.14.1" @@ -110,7 +110,7 @@ SIGV4 = "rest.sigv4-enabled" SIGV4_REGION = "rest.signing-region" SIGV4_SERVICE = "rest.signing-name" -NAMESPACE_SEPARATOR = b"\x1F".decode("UTF-8") +NAMESPACE_SEPARATOR = b"\x1F".decode(UTF8) class TableResponse(IcebergBaseModel): @@ -444,7 +444,7 @@ class RestCatalog(Catalog): write_order=sort_order, properties=properties, ) - serialized_json = request.model_dump_json().encode("utf-8") + serialized_json = request.model_dump_json().encode(UTF8) response = self._session.post( self.url(Endpoints.create_table, namespace=namespace_and_table["namespace"]), data=serialized_json, @@ -475,7 +475,7 @@ class RestCatalog(Catalog): name=namespace_and_table["table"], metadata_location=metadata_location, ) - serialized_json = request.model_dump_json().encode("utf-8") + serialized_json = request.model_dump_json().encode(UTF8) response = self._session.post( self.url(Endpoints.register_table, namespace=namespace_and_table["namespace"]), data=serialized_json, @@ -552,7 +552,7 @@ class RestCatalog(Catalog): """ response = self._session.post( self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)), - data=table_request.model_dump_json().encode("utf-8"), + data=table_request.model_dump_json().encode(UTF8), ) try: response.raise_for_status() diff --git a/pyiceberg/conversions.py b/pyiceberg/conversions.py index aecd43a..f2590b1 100644 --- a/pyiceberg/conversions.py +++ b/pyiceberg/conversions.py @@ -39,7 +39,7 @@ from typing import ( Union, ) -from pyiceberg.typedef import L +from pyiceberg.typedef import UTF8, L from pyiceberg.types import ( BinaryType, BooleanType, @@ -143,7 +143,7 @@ def _(_: UUIDType, value_str: str) -> uuid.UUID: @partition_to_py.register(BinaryType) @handle_none def _(_: PrimitiveType, value_str: str) -> bytes: - return bytes(value_str, "UTF-8") + return bytes(value_str, UTF8) @partition_to_py.register(DecimalType) @@ -223,7 +223,7 @@ def _(_: DoubleType, value: float) -> bytes: @to_bytes.register(StringType) def _(_: StringType, value: str) -> bytes: - return value.encode("UTF-8") + return value.encode(UTF8) @to_bytes.register(UUIDType) @@ -308,7 +308,7 @@ def _(_: DoubleType, b: bytes) -> float: @from_bytes.register(StringType) def _(_: StringType, b: bytes) -> str: - return bytes(b).decode("utf-8") + return bytes(b).decode(UTF8) @from_bytes.register(BinaryType) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 59bad81..75049d8 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -435,13 +435,18 @@ class PyArrowFileIO(FileIO): raise # pragma: no cover - If some other kind of OSError, raise the raw error -def schema_to_pyarrow(schema: Union[Schema, IcebergType]) -> pa.schema: - return visit(schema, _ConvertToArrowSchema()) +def schema_to_pyarrow(schema: Union[Schema, IcebergType], metadata: Dict[bytes, bytes] = EMPTY_DICT) -> pa.schema: + return visit(schema, _ConvertToArrowSchema(metadata)) -class _ConvertToArrowSchema(SchemaVisitorPerPrimitiveType[pa.DataType], Singleton): +class _ConvertToArrowSchema(SchemaVisitorPerPrimitiveType[pa.DataType]): + _metadata: Dict[bytes, bytes] + + def __init__(self, metadata: Dict[bytes, bytes] = EMPTY_DICT) -> None: + self._metadata = metadata + def schema(self, _: Schema, struct_result: pa.StructType) -> pa.schema: - return pa.schema(list(struct_result)) + return pa.schema(list(struct_result), metadata=self._metadata) def struct(self, _: StructType, field_results: List[pa.DataType]) -> pa.DataType: return pa.struct(field_results) diff --git a/pyiceberg/serializers.py b/pyiceberg/serializers.py index 65d4c75..6a580ea 100644 --- a/pyiceberg/serializers.py +++ b/pyiceberg/serializers.py @@ -23,6 +23,7 @@ from typing import Callable from pyiceberg.io import InputFile, InputStream, OutputFile from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil +from pyiceberg.typedef import UTF8 GZIP = "gzip" @@ -76,7 +77,7 @@ class FromByteStream: @staticmethod def table_metadata( - byte_stream: InputStream, encoding: str = "utf-8", compression: Compressor = NOOP_COMPRESSOR + byte_stream: InputStream, encoding: str = UTF8, compression: Compressor = NOOP_COMPRESSOR ) -> TableMetadata: """Instantiate a TableMetadata object from a byte stream. @@ -97,7 +98,7 @@ class FromInputFile: """A collection of methods that deserialize InputFiles into Iceberg objects.""" @staticmethod - def table_metadata(input_file: InputFile, encoding: str = "utf-8") -> TableMetadata: + def table_metadata(input_file: InputFile, encoding: str = UTF8) -> TableMetadata: """Create a TableMetadata instance from an input file. Args: @@ -126,6 +127,6 @@ class ToOutputFile: overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`. """ with output_file.create(overwrite=overwrite) as output_stream: - json_bytes = metadata.model_dump_json().encode("utf-8") + json_bytes = metadata.model_dump_json().encode(UTF8) json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes) output_stream.write(json_bytes) diff --git a/pyiceberg/typedef.py b/pyiceberg/typedef.py index ff2a6d1..56a3d3c 100644 --- a/pyiceberg/typedef.py +++ b/pyiceberg/typedef.py @@ -51,6 +51,8 @@ class FrozenDict(Dict[Any, Any]): raise AttributeError("FrozenDict does not support .update()") +UTF8 = 'utf-8' + EMPTY_DICT = FrozenDict() K = TypeVar("K") diff --git a/pyiceberg/utils/config.py b/pyiceberg/utils/config.py index 7ca3382..31ba0b3 100644 --- a/pyiceberg/utils/config.py +++ b/pyiceberg/utils/config.py @@ -20,7 +20,7 @@ from typing import List, Optional import strictyaml -from pyiceberg.typedef import FrozenDict, RecursiveDict +from pyiceberg.typedef import UTF8, FrozenDict, RecursiveDict PYICEBERG = "pyiceberg_" DEFAULT = "default" @@ -76,7 +76,7 @@ class Config: if directory: path = os.path.join(directory, PYICEBERG_YML) if os.path.isfile(path): - with open(path, encoding="utf-8") as f: + with open(path, encoding=UTF8) as f: yml_str = f.read() file_config = strictyaml.load(yml_str).data file_config_lowercase = _lowercase_dictionary_keys(file_config) diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 9e3dd1a..c280146 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -58,6 +58,7 @@ from pyiceberg.table.sorting import ( SortOrder, ) from pyiceberg.transforms import BucketTransform, IdentityTransform +from pyiceberg.typedef import UTF8 from pyiceberg.types import ( BooleanType, IntegerType, @@ -253,7 +254,7 @@ def test_create_table(table_schema_simple: Schema, hive_database: HiveDatabase, ) ) - with open(metadata_location, encoding="utf-8") as f: + with open(metadata_location, encoding=UTF8) as f: payload = f.read() metadata = TableMetadataUtil.parse_raw(payload) diff --git a/tests/conftest.py b/tests/conftest.py index 79c01dc..37ca1e7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -74,6 +74,7 @@ from pyiceberg.schema import Accessor, Schema from pyiceberg.serializers import ToOutputFile from pyiceberg.table import FileScanTask, Table from pyiceberg.table.metadata import TableMetadataV2 +from pyiceberg.typedef import UTF8 from pyiceberg.types import ( BinaryType, BooleanType, @@ -1456,7 +1457,7 @@ class MockHttpClientResponse(aiohttp.client_reqrep.ClientResponse): @property def raw_headers(self) -> aiohttp.typedefs.RawHeaders: # Return the headers encoded the way that aiobotocore expects them - return {k.encode("utf-8"): str(v).encode("utf-8") for k, v in self.response.headers.items()}.items() + return {k.encode(UTF8): str(v).encode(UTF8) for k, v in self.response.headers.items()}.items() def patch_aiobotocore() -> None: diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 7174c91..4a476f2 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -56,6 +56,7 @@ from pyiceberg.expressions import ( ) from pyiceberg.io import InputStream, OutputStream, load_file_io from pyiceberg.io.pyarrow import ( + ICEBERG_SCHEMA, PyArrowFile, PyArrowFileIO, _ConvertToArrowSchema, @@ -69,6 +70,7 @@ from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema, make_compatible_name, visit from pyiceberg.table import FileScanTask, Table from pyiceberg.table.metadata import TableMetadataV2 +from pyiceberg.typedef import UTF8 from pyiceberg.types import ( BinaryType, BooleanType, @@ -708,7 +710,7 @@ def _write_table_to_file(filepath: str, schema: pa.Schema, table: pa.Table) -> s @pytest.fixture def file_int(schema_int: Schema, tmpdir: str) -> str: - pyarrow_schema = pa.schema(schema_to_pyarrow(schema_int), metadata={"iceberg.schema": schema_int.model_dump_json()}) + pyarrow_schema = schema_to_pyarrow(schema_int, metadata={ICEBERG_SCHEMA: bytes(schema_int.model_dump_json(), UTF8)}) return _write_table_to_file( f"file:{tmpdir}/a.parquet", pyarrow_schema, pa.Table.from_arrays([pa.array([0, 1, 2])], schema=pyarrow_schema) ) @@ -716,7 +718,7 @@ def file_int(schema_int: Schema, tmpdir: str) -> str: @pytest.fixture def file_int_str(schema_int_str: Schema, tmpdir: str) -> str: - pyarrow_schema = pa.schema(schema_to_pyarrow(schema_int_str), metadata={"iceberg.schema": schema_int_str.model_dump_json()}) + pyarrow_schema = schema_to_pyarrow(schema_int_str, metadata={ICEBERG_SCHEMA: bytes(schema_int_str.model_dump_json(), UTF8)}) return _write_table_to_file( f"file:{tmpdir}/a.parquet", pyarrow_schema, @@ -726,7 +728,7 @@ def file_int_str(schema_int_str: Schema, tmpdir: str) -> str: @pytest.fixture def file_string(schema_str: Schema, tmpdir: str) -> str: - pyarrow_schema = pa.schema(schema_to_pyarrow(schema_str), metadata={"iceberg.schema": schema_str.model_dump_json()}) + pyarrow_schema = schema_to_pyarrow(schema_str, metadata={ICEBERG_SCHEMA: bytes(schema_str.model_dump_json(), UTF8)}) return _write_table_to_file( f"file:{tmpdir}/b.parquet", pyarrow_schema, pa.Table.from_arrays([pa.array(["0", "1", "2"])], schema=pyarrow_schema) ) @@ -734,7 +736,7 @@ def file_string(schema_str: Schema, tmpdir: str) -> str: @pytest.fixture def file_long(schema_long: Schema, tmpdir: str) -> str: - pyarrow_schema = pa.schema(schema_to_pyarrow(schema_long), metadata={"iceberg.schema": schema_long.model_dump_json()}) + pyarrow_schema = schema_to_pyarrow(schema_long, metadata={ICEBERG_SCHEMA: bytes(schema_long.model_dump_json(), UTF8)}) return _write_table_to_file( f"file:{tmpdir}/c.parquet", pyarrow_schema, pa.Table.from_arrays([pa.array([0, 1, 2])], schema=pyarrow_schema) ) @@ -742,7 +744,7 @@ def file_long(schema_long: Schema, tmpdir: str) -> str: @pytest.fixture def file_struct(schema_struct: Schema, tmpdir: str) -> str: - pyarrow_schema = pa.schema(schema_to_pyarrow(schema_struct), metadata={"iceberg.schema": schema_struct.model_dump_json()}) + pyarrow_schema = schema_to_pyarrow(schema_struct, metadata={ICEBERG_SCHEMA: bytes(schema_struct.model_dump_json(), UTF8)}) return _write_table_to_file( f"file:{tmpdir}/d.parquet", pyarrow_schema, @@ -759,7 +761,7 @@ def file_struct(schema_struct: Schema, tmpdir: str) -> str: @pytest.fixture def file_list(schema_list: Schema, tmpdir: str) -> str: - pyarrow_schema = pa.schema(schema_to_pyarrow(schema_list), metadata={"iceberg.schema": schema_list.model_dump_json()}) + pyarrow_schema = schema_to_pyarrow(schema_list, metadata={ICEBERG_SCHEMA: bytes(schema_list.model_dump_json(), UTF8)}) return _write_table_to_file( f"file:{tmpdir}/e.parquet", pyarrow_schema, @@ -776,8 +778,8 @@ def file_list(schema_list: Schema, tmpdir: str) -> str: @pytest.fixture def file_list_of_structs(schema_list_of_structs: Schema, tmpdir: str) -> str: - pyarrow_schema = pa.schema( - schema_to_pyarrow(schema_list_of_structs), metadata={"iceberg.schema": schema_list_of_structs.model_dump_json()} + pyarrow_schema = schema_to_pyarrow( + schema_list_of_structs, metadata={ICEBERG_SCHEMA: bytes(schema_list_of_structs.model_dump_json(), UTF8)} ) return _write_table_to_file( f"file:{tmpdir}/e.parquet", @@ -795,7 +797,7 @@ def file_list_of_structs(schema_list_of_structs: Schema, tmpdir: str) -> str: @pytest.fixture def file_map(schema_map: Schema, tmpdir: str) -> str: - pyarrow_schema = pa.schema(schema_to_pyarrow(schema_map), metadata={"iceberg.schema": schema_map.model_dump_json()}) + pyarrow_schema = schema_to_pyarrow(schema_map, metadata={ICEBERG_SCHEMA: bytes(schema_map.model_dump_json(), UTF8)}) return _write_table_to_file( f"file:{tmpdir}/e.parquet", pyarrow_schema, diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py index 2273843..173e93e 100644 --- a/tests/table/test_metadata.py +++ b/tests/table/test_metadata.py @@ -39,6 +39,7 @@ from pyiceberg.table.metadata import ( from pyiceberg.table.refs import SnapshotRef, SnapshotRefType from pyiceberg.table.sorting import NullOrder, SortDirection, SortField from pyiceberg.transforms import IdentityTransform +from pyiceberg.typedef import UTF8 from pyiceberg.types import ( BooleanType, FloatType, @@ -99,7 +100,7 @@ def test_from_dict_v2_parse_raw(example_table_metadata_v2: Dict[str, Any]) -> No def test_from_byte_stream(example_table_metadata_v2: Dict[str, Any]) -> None: """Test generating a TableMetadata instance from a file-like byte stream""" - data = bytes(json.dumps(example_table_metadata_v2), encoding="utf-8") + data = bytes(json.dumps(example_table_metadata_v2), encoding=UTF8) byte_stream = io.BytesIO(data) FromByteStream.table_metadata(byte_stream=byte_stream) diff --git a/tests/test_transforms.py b/tests/test_transforms.py index bcc9784..8bec844 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -74,6 +74,7 @@ from pyiceberg.transforms import ( YearTransform, parse_transform, ) +from pyiceberg.typedef import UTF8 from pyiceberg.types import ( BinaryType, BooleanType, @@ -187,7 +188,7 @@ def test_bucket_method(type_var: PrimitiveType) -> None: def test_string_with_surrogate_pair() -> None: string_with_surrogate_pair = "string with a surrogate pair: 💰" - as_bytes = bytes(string_with_surrogate_pair, "UTF-8") + as_bytes = bytes(string_with_surrogate_pair, UTF8) bucket_transform = BucketTransform(100).transform(StringType(), bucket=False) assert bucket_transform(string_with_surrogate_pair) == mmh3.hash(as_bytes) @@ -392,7 +393,7 @@ def test_truncate_string(input_var: str, expected: str) -> None: "type_var,value,expected_human_str,expected", [ (BinaryType(), b"\x00\x01\x02\x03", "AAECAw==", b"\x00"), - (BinaryType(), bytes("\u2603de", "utf-8"), "4piDZGU=", b"\xe2"), + (BinaryType(), bytes("\u2603de", UTF8), "4piDZGU=", b"\xe2"), (DecimalType(8, 5), Decimal("14.21"), "14.21", Decimal("14.21")), (IntegerType(), 123, "123", 123), (LongType(), 123, "123", 123), diff --git a/tests/utils/test_config.py b/tests/utils/test_config.py index 11c3076..d694e15 100644 --- a/tests/utils/test_config.py +++ b/tests/utils/test_config.py @@ -20,7 +20,7 @@ from unittest import mock import pytest from strictyaml import as_document -from pyiceberg.typedef import RecursiveDict +from pyiceberg.typedef import UTF8, RecursiveDict from pyiceberg.utils.config import Config, _lowercase_dictionary_keys, merge_config EXAMPLE_ENV = {"PYICEBERG_CATALOG__PRODUCTION__URI": "https://service.io/api"} @@ -43,7 +43,7 @@ def test_from_environment_variables_uppercase() -> None: def test_from_configuration_files(tmp_path_factory: pytest.TempPathFactory) -> None: config_path = str(tmp_path_factory.mktemp("config")) - with open(f"{config_path}/.pyiceberg.yaml", "w", encoding="utf-8") as file: + with open(f"{config_path}/.pyiceberg.yaml", "w", encoding=UTF8) as file: yaml_str = as_document({"catalog": {"production": {"uri": "https://service.io/api"}}}).as_yaml() file.write(yaml_str)