This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 5a213212 Add unknown type (#1681)
5a213212 is described below
commit 5a21321282c08cd95553118d67247ebf2348bf02
Author: Kaushik Srinivasan <[email protected]>
AuthorDate: Wed Mar 12 17:16:43 2025 -0400
Add unknown type (#1681)
Support for new type `Unknown`, part of Iceberg V3 spec. Closes of
#1553.
---------
Co-authored-by: Fokko Driesprong <[email protected]>
---
pyiceberg/avro/encoder.py | 4 ++++
pyiceberg/avro/reader.py | 8 ++++++++
pyiceberg/avro/resolver.py | 12 ++++++++++++
pyiceberg/avro/writer.py | 6 ++++++
pyiceberg/catalog/hive.py | 2 ++
pyiceberg/conversions.py | 12 ++++++++++++
pyiceberg/io/pyarrow.py | 9 +++++++++
pyiceberg/partitioning.py | 4 ++--
pyiceberg/schema.py | 17 ++++++++++++++--
pyiceberg/types.py | 20 ++++++++++++++++++-
pyiceberg/utils/schema_conversion.py | 9 +++++++--
tests/avro/test_reader.py | 11 +++--------
tests/avro/test_writer.py | 13 +++---------
tests/conftest.py | 2 +-
tests/integration/test_reads.py | 37 +++++++++++++++++++++++++++++++++++
tests/test_transforms.py | 26 ++++++++++++++++++++++++
tests/utils/test_schema_conversion.py | 15 +++++++-------
17 files changed, 174 insertions(+), 33 deletions(-)
diff --git a/pyiceberg/avro/encoder.py b/pyiceberg/avro/encoder.py
index 755627e7..899c65a1 100644
--- a/pyiceberg/avro/encoder.py
+++ b/pyiceberg/avro/encoder.py
@@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from typing import Any
from uuid import UUID
from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
@@ -74,3 +75,6 @@ class BinaryEncoder:
if len(uuid.bytes) != 16:
raise ValueError(f"Expected UUID to have 16 bytes, got:
len({uuid.bytes!r})")
return self.write(uuid.bytes)
+
+ def write_unknown(self, _: Any) -> None:
+ """Nulls are written as 0 bytes in avro, so we do nothing."""
diff --git a/pyiceberg/avro/reader.py b/pyiceberg/avro/reader.py
index a5578680..21f5d807 100644
--- a/pyiceberg/avro/reader.py
+++ b/pyiceberg/avro/reader.py
@@ -201,6 +201,14 @@ class UUIDReader(Reader):
decoder.skip(16)
+class UnknownReader(Reader):
+ def read(self, decoder: BinaryDecoder) -> None:
+ return None
+
+ def skip(self, decoder: BinaryDecoder) -> None:
+ pass
+
+
@dataclass(frozen=True)
class FixedReader(Reader):
_len: int = dataclassfield()
diff --git a/pyiceberg/avro/resolver.py b/pyiceberg/avro/resolver.py
index 2a53f486..004af8bd 100644
--- a/pyiceberg/avro/resolver.py
+++ b/pyiceberg/avro/resolver.py
@@ -46,6 +46,7 @@ from pyiceberg.avro.reader import (
TimeReader,
TimestampReader,
TimestamptzReader,
+ UnknownReader,
UUIDReader,
)
from pyiceberg.avro.writer import (
@@ -66,6 +67,7 @@ from pyiceberg.avro.writer import (
TimestamptzWriter,
TimestampWriter,
TimeWriter,
+ UnknownWriter,
UUIDWriter,
Writer,
)
@@ -100,6 +102,7 @@ from pyiceberg.types import (
TimestampType,
TimestamptzType,
TimeType,
+ UnknownType,
UUIDType,
)
@@ -193,6 +196,9 @@ class
ConstructWriter(SchemaVisitorPerPrimitiveType[Writer]):
def visit_binary(self, binary_type: BinaryType) -> Writer:
return BinaryWriter()
+ def visit_unknown(self, unknown_type: UnknownType) -> Writer:
+ return UnknownWriter()
+
CONSTRUCT_WRITER_VISITOR = ConstructWriter()
@@ -341,6 +347,9 @@ class
WriteSchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Writer]):
def visit_binary(self, binary_type: BinaryType, partner:
Optional[IcebergType]) -> Writer:
return BinaryWriter()
+ def visit_unknown(self, unknown_type: UnknownType, partner:
Optional[IcebergType]) -> Writer:
+ return UnknownWriter()
+
class ReadSchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
__slots__ = ("read_types", "read_enums", "context")
@@ -471,6 +480,9 @@ class
ReadSchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
def visit_binary(self, binary_type: BinaryType, partner:
Optional[IcebergType]) -> Reader:
return BinaryReader()
+ def visit_unknown(self, unknown_type: UnknownType, partner:
Optional[IcebergType]) -> Reader:
+ return UnknownReader()
+
class SchemaPartnerAccessor(PartnerAccessor[IcebergType]):
def schema_partner(self, partner: Optional[IcebergType]) ->
Optional[IcebergType]:
diff --git a/pyiceberg/avro/writer.py b/pyiceberg/avro/writer.py
index b53230d3..51d2d1bc 100644
--- a/pyiceberg/avro/writer.py
+++ b/pyiceberg/avro/writer.py
@@ -113,6 +113,12 @@ class UUIDWriter(Writer):
encoder.write(val.bytes)
+@dataclass(frozen=True)
+class UnknownWriter(Writer):
+ def write(self, encoder: BinaryEncoder, val: Any) -> None:
+ encoder.write_unknown(val)
+
+
@dataclass(frozen=True)
class FixedWriter(Writer):
_len: int = dataclassfield()
diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py
index 7189b12c..ec832727 100644
--- a/pyiceberg/catalog/hive.py
+++ b/pyiceberg/catalog/hive.py
@@ -110,6 +110,7 @@ from pyiceberg.types import (
TimestampType,
TimestamptzType,
TimeType,
+ UnknownType,
UUIDType,
)
from pyiceberg.utils.properties import property_as_bool, property_as_float
@@ -236,6 +237,7 @@ HIVE_PRIMITIVE_TYPES = {
UUIDType: "string",
BinaryType: "binary",
FixedType: "binary",
+ UnknownType: "void",
}
diff --git a/pyiceberg/conversions.py b/pyiceberg/conversions.py
index de67cdff..7c1455d4 100644
--- a/pyiceberg/conversions.py
+++ b/pyiceberg/conversions.py
@@ -56,6 +56,7 @@ from pyiceberg.types import (
TimestampType,
TimestamptzType,
TimeType,
+ UnknownType,
UUIDType,
strtobool,
)
@@ -154,6 +155,12 @@ def _(_: DecimalType, value_str: str) -> Decimal:
return Decimal(value_str)
+@partition_to_py.register(UnknownType)
+@handle_none
+def _(type_: UnknownType, _: str) -> None:
+ return None
+
+
@singledispatch
def to_bytes(
primitive_type: PrimitiveType, _: Union[bool, bytes, Decimal, date,
datetime, float, int, str, time, uuid.UUID]
@@ -324,3 +331,8 @@ def _(_: PrimitiveType, b: bytes) -> bytes:
def _(primitive_type: DecimalType, buf: bytes) -> Decimal:
unscaled = int.from_bytes(buf, "big", signed=True)
return unscaled_to_decimal(unscaled, primitive_type.scale)
+
+
+@from_bytes.register(UnknownType)
+def _(type_: UnknownType, buf: bytes) -> None:
+ return None
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 7c8aaaab..d9f84a42 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -166,6 +166,7 @@ from pyiceberg.types import (
TimestampType,
TimestamptzType,
TimeType,
+ UnknownType,
UUIDType,
)
from pyiceberg.utils.concurrent import ExecutorFactory
@@ -670,6 +671,9 @@ class
_ConvertToArrowSchema(SchemaVisitorPerPrimitiveType[pa.DataType]):
def visit_uuid(self, _: UUIDType) -> pa.DataType:
return pa.binary(16)
+ def visit_unknown(self, _: UnknownType) -> pa.DataType:
+ return pa.null()
+
def visit_binary(self, _: BinaryType) -> pa.DataType:
return pa.large_binary()
@@ -1220,6 +1224,8 @@ class
_ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
elif pa.types.is_fixed_size_binary(primitive):
primitive = cast(pa.FixedSizeBinaryType, primitive)
return FixedType(primitive.byte_width)
+ elif pa.types.is_null(primitive):
+ return UnknownType()
raise TypeError(f"Unsupported type: {primitive}")
@@ -1900,6 +1906,9 @@ class
PrimitiveToPhysicalType(SchemaVisitorPerPrimitiveType[str]):
def visit_binary(self, binary_type: BinaryType) -> str:
return "BYTE_ARRAY"
+ def visit_unknown(self, unknown_type: UnknownType) -> str:
+ return "UNKNOWN"
+
_PRIMITIVE_TO_PHYSICAL_TYPE_VISITOR = PrimitiveToPhysicalType()
diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py
index 2bed2ce8..d33e13b4 100644
--- a/pyiceberg/partitioning.py
+++ b/pyiceberg/partitioning.py
@@ -444,7 +444,7 @@ def _(type: IcebergType, value: Optional[Union[int,
datetime]]) -> Optional[int]
elif isinstance(value, datetime):
return datetime_to_micros(value)
else:
- raise ValueError(f"Unknown type: {value}")
+ raise ValueError(f"Type not recognized: {value}")
@_to_partition_representation.register(DateType)
@@ -456,7 +456,7 @@ def _(type: IcebergType, value: Optional[Union[int, date]])
-> Optional[int]:
elif isinstance(value, date):
return date_to_days(value)
else:
- raise ValueError(f"Unknown type: {value}")
+ raise ValueError(f"Type not recognized: {value}")
@_to_partition_representation.register(TimeType)
diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py
index 5a373cb1..7f6cfe99 100644
--- a/pyiceberg/schema.py
+++ b/pyiceberg/schema.py
@@ -60,6 +60,7 @@ from pyiceberg.types import (
TimestampType,
TimestamptzType,
TimeType,
+ UnknownType,
UUIDType,
)
@@ -531,8 +532,10 @@ class
PrimitiveWithPartnerVisitor(SchemaWithPartnerVisitor[P, T]):
return self.visit_fixed(primitive, primitive_partner)
elif isinstance(primitive, BinaryType):
return self.visit_binary(primitive, primitive_partner)
+ elif isinstance(primitive, UnknownType):
+ return self.visit_unknown(primitive, primitive_partner)
else:
- raise ValueError(f"Unknown type: {primitive}")
+ raise ValueError(f"Type not recognized: {primitive}")
@abstractmethod
def visit_boolean(self, boolean_type: BooleanType, partner: Optional[P])
-> T:
@@ -590,6 +593,10 @@ class
PrimitiveWithPartnerVisitor(SchemaWithPartnerVisitor[P, T]):
def visit_binary(self, binary_type: BinaryType, partner: Optional[P]) -> T:
"""Visit a BinaryType."""
+ @abstractmethod
+ def visit_unknown(self, unknown_type: UnknownType, partner: Optional[P])
-> T:
+ """Visit a UnknownType."""
+
class PartnerAccessor(Generic[P], ABC):
@abstractmethod
@@ -707,8 +714,10 @@ class SchemaVisitorPerPrimitiveType(SchemaVisitor[T], ABC):
return self.visit_uuid(primitive)
elif isinstance(primitive, BinaryType):
return self.visit_binary(primitive)
+ elif isinstance(primitive, UnknownType):
+ return self.visit_unknown(primitive)
else:
- raise ValueError(f"Unknown type: {primitive}")
+ raise ValueError(f"Type not recognized: {primitive}")
@abstractmethod
def visit_fixed(self, fixed_type: FixedType) -> T:
@@ -766,6 +775,10 @@ class SchemaVisitorPerPrimitiveType(SchemaVisitor[T], ABC):
def visit_binary(self, binary_type: BinaryType) -> T:
"""Visit a BinaryType."""
+ @abstractmethod
+ def visit_unknown(self, unknown_type: UnknownType) -> T:
+ """Visit a UnknownType."""
+
@dataclass(init=True, eq=True, frozen=True)
class Accessor:
diff --git a/pyiceberg/types.py b/pyiceberg/types.py
index bd0eb7a5..456f9ad8 100644
--- a/pyiceberg/types.py
+++ b/pyiceberg/types.py
@@ -148,13 +148,15 @@ class IcebergType(IcebergBaseModel):
return UUIDType()
if v == "binary":
return BinaryType()
+ if v == "unknown":
+ return UnknownType()
if v.startswith("fixed"):
return FixedType(_parse_fixed_type(v))
if v.startswith("decimal"):
precision, scale = _parse_decimal_type(v)
return DecimalType(precision, scale)
else:
- raise ValueError(f"Unknown type: {v}")
+ raise ValueError(f"Type not recognized: {v}")
if isinstance(v, dict) and cls == IcebergType:
complex_type = v.get("type")
if complex_type == "list":
@@ -747,3 +749,19 @@ class BinaryType(PrimitiveType):
"""
root: Literal["binary"] = Field(default="binary")
+
+
+class UnknownType(PrimitiveType):
+ """An unknown data type in Iceberg can be represented using an instance of
this class.
+
+ Unknowns in Iceberg are used to represent data types that are not known at
the time of writing.
+
+ Example:
+ >>> column_foo = UnknownType()
+ >>> isinstance(column_foo, UnknownType)
+ True
+ >>> column_foo
+ UnknownType()
+ """
+
+ root: Literal["unknown"] = Field(default="unknown")
diff --git a/pyiceberg/utils/schema_conversion.py
b/pyiceberg/utils/schema_conversion.py
index 8a303b7f..17eb2051 100644
--- a/pyiceberg/utils/schema_conversion.py
+++ b/pyiceberg/utils/schema_conversion.py
@@ -47,6 +47,7 @@ from pyiceberg.types import (
TimestampType,
TimestamptzType,
TimeType,
+ UnknownType,
UUIDType,
)
from pyiceberg.utils.decimal import decimal_required_bytes
@@ -62,6 +63,7 @@ PRIMITIVE_FIELD_TYPE_MAPPING: Dict[str, PrimitiveType] = {
"long": LongType(),
"string": StringType(),
"enum": StringType(),
+ "null": UnknownType(),
}
LOGICAL_FIELD_TYPE_MAPPING: Dict[Tuple[str, str], PrimitiveType] = {
@@ -209,9 +211,9 @@ class AvroSchemaConversion:
elif isinstance(type_identifier, str) and type_identifier in
PRIMITIVE_FIELD_TYPE_MAPPING:
return PRIMITIVE_FIELD_TYPE_MAPPING[type_identifier]
else:
- raise TypeError(f"Unknown type: {avro_type}")
+ raise TypeError(f"Type not recognized: {avro_type}")
else:
- raise TypeError(f"Unknown type: {avro_type}")
+ raise TypeError(f"Type not recognized: {avro_type}")
def _convert_field(self, field: Dict[str, Any]) -> NestedField:
"""Convert an Avro field into an Iceberg equivalent field.
@@ -618,3 +620,6 @@ class
ConvertSchemaToAvro(SchemaVisitorPerPrimitiveType[AvroType]):
def visit_binary(self, binary_type: BinaryType) -> AvroType:
return "bytes"
+
+ def visit_unknown(self, unknown_type: UnknownType) -> AvroType:
+ return "null"
diff --git a/tests/avro/test_reader.py b/tests/avro/test_reader.py
index c97d421d..c713201b 100644
--- a/tests/avro/test_reader.py
+++ b/tests/avro/test_reader.py
@@ -37,6 +37,7 @@ from pyiceberg.avro.reader import (
TimeReader,
TimestampReader,
TimestamptzReader,
+ UnknownReader,
UUIDReader,
)
from pyiceberg.avro.resolver import construct_reader
@@ -55,12 +56,12 @@ from pyiceberg.types import (
IntegerType,
LongType,
NestedField,
- PrimitiveType,
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
+ UnknownType,
UUIDType,
)
@@ -325,13 +326,7 @@ def test_binary_reader() -> None:
def test_unknown_type() -> None:
- class UnknownType(PrimitiveType):
- root: str = "UnknownType"
-
- with pytest.raises(ValueError) as exc_info:
- construct_reader(UnknownType())
-
- assert "Unknown type:" in str(exc_info.value)
+ assert construct_reader(UnknownType()) == UnknownReader()
def test_uuid_reader() -> None:
diff --git a/tests/avro/test_writer.py b/tests/avro/test_writer.py
index 39b8ecc3..951be7e7 100644
--- a/tests/avro/test_writer.py
+++ b/tests/avro/test_writer.py
@@ -21,8 +21,6 @@ import struct
from _decimal import Decimal
from typing import Dict, List
-import pytest
-
from pyiceberg.avro.encoder import BinaryEncoder
from pyiceberg.avro.resolver import construct_writer
from pyiceberg.avro.writer import (
@@ -38,6 +36,7 @@ from pyiceberg.avro.writer import (
TimestamptzWriter,
TimestampWriter,
TimeWriter,
+ UnknownWriter,
UUIDWriter,
)
from pyiceberg.typedef import Record
@@ -54,12 +53,12 @@ from pyiceberg.types import (
LongType,
MapType,
NestedField,
- PrimitiveType,
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
+ UnknownType,
UUIDType,
)
@@ -127,13 +126,7 @@ def test_binary_writer() -> None:
def test_unknown_type() -> None:
- class UnknownType(PrimitiveType):
- root: str = "UnknownType"
-
- with pytest.raises(ValueError) as exc_info:
- construct_writer(UnknownType())
-
- assert "Unknown type:" in str(exc_info.value)
+ assert construct_writer(UnknownType()) == UnknownWriter()
def test_uuid_writer() -> None:
diff --git a/tests/conftest.py b/tests/conftest.py
index e9abd9bf..1cbb3cfa 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -921,7 +921,7 @@ EXAMPLE_TABLE_METADATA_V3 = {
{"id": 2, "name": "y", "required": True, "type": "long",
"doc": "comment"},
{"id": 3, "name": "z", "required": True, "type": "long"},
# TODO: Add unknown, timestamp(tz)_ns
- # {"id": 4, "name": "u", "required": True, "type": "unknown"},
+ {"id": 4, "name": "u", "required": True, "type": "unknown"},
# {"id": 5, "name": "ns", "required": True, "type":
"timestamp_ns"},
# {"id": 6, "name": "nstz", "required": True, "type":
"timestamptz_ns"},
],
diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py
index ee5f8a25..1eb3500a 100644
--- a/tests/integration/test_reads.py
+++ b/tests/integration/test_reads.py
@@ -56,6 +56,7 @@ from pyiceberg.types import (
NestedField,
StringType,
TimestampType,
+ UnknownType,
)
from pyiceberg.utils.concurrent import ExecutorFactory
@@ -978,3 +979,39 @@ def test_scan_with_datetime(catalog: Catalog) -> None:
df = table.scan(row_filter=LessThan("datetime", yesterday)).to_pandas()
assert len(df) == 0
+
+
[email protected]
[email protected]("catalog",
[pytest.lazy_fixture("session_catalog_hive")])
+def test_read_unknown_type(catalog: Catalog) -> None:
+ identifier = "default.test_table_read_unknown_type"
+ arrow_table = pa.Table.from_pydict(
+ {
+ "int": [1, 2],
+ "string": ["a", "b"],
+ "unknown": [None, None],
+ },
+ schema=pa.schema(
+ [
+ pa.field("int", pa.int32(), nullable=True),
+ pa.field("string", pa.string(), nullable=True),
+ pa.field("unknown", pa.null(), nullable=True),
+ ],
+ ),
+ )
+
+ try:
+ catalog.drop_table(identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = catalog.create_table(
+ identifier,
+ schema=arrow_table.schema,
+ )
+
+ tbl.append(arrow_table)
+
+ assert tbl.schema().find_type("unknown") == UnknownType()
+ result_table = tbl.scan().to_arrow()
+ assert result_table["unknown"].to_pylist() == [None, None]
diff --git a/tests/test_transforms.py b/tests/test_transforms.py
index 3ad3ff5a..8987f8b1 100644
--- a/tests/test_transforms.py
+++ b/tests/test_transforms.py
@@ -106,6 +106,7 @@ from pyiceberg.types import (
TimestampType,
TimestamptzType,
TimeType,
+ UnknownType,
UUIDType,
)
from pyiceberg.utils.datetime import (
@@ -202,6 +203,31 @@ def test_bucket_method(type_var: PrimitiveType) -> None:
assert bucket_transform.to_human_string(type_var, "test") == "test"
[email protected](
+ "test_transform",
+ [
+ BucketTransform(8),
+ TruncateTransform(10),
+ YearTransform(),
+ MonthTransform(),
+ DayTransform(),
+ HourTransform(),
+ UnknownTransform("unknown"),
+ ],
+)
+def test_transforms_unknown_type(test_transform: Transform[Any, Any]) -> None:
+ assert not test_transform.can_transform(UnknownType())
+ with pytest.raises((ValueError, AttributeError)):
+ test_transform.transform(UnknownType())
+
+
+def test_identity_transform_unknown_type() -> None:
+ assert IdentityTransform().can_transform(UnknownType())
+ assert IdentityTransform().result_type(UnknownType()) == UnknownType()
+ assert IdentityTransform().transform(UnknownType())(None) is None
+ assert IdentityTransform().to_human_string(UnknownType(), None) == "null"
+
+
def test_string_with_surrogate_pair() -> None:
string_with_surrogate_pair = "string with a surrogate pair: 💰"
as_bytes = bytes(string_with_surrogate_pair, UTF8)
diff --git a/tests/utils/test_schema_conversion.py
b/tests/utils/test_schema_conversion.py
index 2c42c445..e60a8956 100644
--- a/tests/utils/test_schema_conversion.py
+++ b/tests/utils/test_schema_conversion.py
@@ -33,6 +33,7 @@ from pyiceberg.types import (
NestedField,
StringType,
StructType,
+ UnknownType,
)
from pyiceberg.utils.schema_conversion import AvroSchemaConversion
@@ -263,19 +264,19 @@ def test_fixed_type() -> None:
def test_unknown_primitive() -> None:
- with pytest.raises(TypeError) as exc_info:
- avro_type = "UnknownType"
- AvroSchemaConversion()._convert_schema(avro_type)
- assert "Unknown type: UnknownType" in str(exc_info.value)
+ avro_type = "null"
+ actual = AvroSchemaConversion()._convert_schema(avro_type)
+ expected = UnknownType()
+ assert actual == expected
-def test_unknown_complex_type() -> None:
+def test_unrecognized_complex_type() -> None:
with pytest.raises(TypeError) as exc_info:
avro_type = {
- "type": "UnknownType",
+ "type": "UnrecognizedType",
}
AvroSchemaConversion()._convert_schema(avro_type)
- assert "Unknown type: {'type': 'UnknownType'}" in str(exc_info.value)
+ assert "Type not recognized: {'type': 'UnrecognizedType'}" in
str(exc_info.value)
def test_convert_field_without_field_id() -> None: