ahmedabu98 commented on code in PR #38206:
URL: https://github.com/apache/beam/pull/38206#discussion_r3094643399
##########
sdks/python/apache_beam/coders/coder_impl.py:
##########
@@ -850,6 +850,22 @@ def estimate_size(self, unused_value, nested=False):
return 2
+class ByteCoderImpl(StreamCoderImpl):
+ """For internal use only; no backwards-compatibility guarantees."""
+ def encode_to_stream(self, value, out, nested):
+ # type: (int, create_OutputStream, bool) -> None
+ out.write_byte(value)
+
+ def decode_from_stream(self, in_stream, nested):
+ # type: (create_InputStream, bool) -> float
+ return in_stream.read_byte()
+
+ def estimate_size(self, unused_value, nested=False):
+ # type: (Any, bool) -> int
+ # A short is encoded as 2 bytes, regardless of nesting.
Review Comment:
nit: cleanup
##########
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java:
##########
@@ -178,4 +178,23 @@ public void testFieldTypeNotFound() {
thrown.expectMessage("Cannot find a matching Beam FieldType for Calcite
type: UNKNOWN");
CalciteUtils.toFieldType(relDataType);
}
+
+ @Test
+ public void testToRelDataTypeWithRowBackedLogicalType() {
+ Schema nestedSchema = Schema.builder().addField("nested_f1",
Schema.FieldType.INT32).build();
+ Schema.FieldType rowType = Schema.FieldType.row(nestedSchema);
+
+ Schema.LogicalType<org.apache.beam.sdk.values.Row,
org.apache.beam.sdk.values.Row> logicalType =
+ new org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType<
Review Comment:
readability nit: import these instead of using fully qualified names?
##########
sdks/python/apache_beam/transforms/sql_test.py:
##########
@@ -149,6 +162,34 @@ def test_row(self):
| SqlTransform("SELECT a*a as s, LENGTH(b) AS c FROM PCOLLECTION"))
assert_that(out, equal_to([(1, 1), (4, 1), (100, 2)]))
+ @staticmethod
+ def recover_to_python_type(input):
+ fields = []
+ for field in input:
+ print(field)
+ if hasattr(field, 'type_byte') and hasattr(field, 'payload'):
+ obj = coders.FastPrimitivesCoder().decode(
+ field.type_byte.to_bytes() + field.payload)
+ fields.append(obj)
+ else:
+ fields.append(field)
+ return tuple(fields)
+
+ def test_row_user_type(self):
+ with TestPipeline() as p:
+ out = (
+ p | beam.Create([
+ UserTypeRow(1, Aribitrary(1.0), 1 + 2.5j),
+ UserTypeRow(1, Aribitrary("abc"), -1j),
+ ])
+ | SqlTransform("SELECT arb, complex FROM PCOLLECTION")
+ # TODO: recover to user type. Currently pipeline can run,
Review Comment:
Should we create a github issue for this TODO ?
##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -255,6 +256,36 @@ def schema_field(
description=description)
+def _python_any_schema_pb2():
+ return schema_pb2.FieldType(
+ logical_type=schema_pb2.LogicalType(
+ urn=PYTHON_ANY_URN,
+ representation=schema_pb2.FieldType(
+ nullable=False,
+ row_type=schema_pb2.RowType(
+ schema=schema_pb2.Schema(
+ fields=[
+ schema_pb2.Field(
+ name="type_byte",
+ type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.BYTE,
nullable=False)),
+ schema_pb2.Field(
+ name="payload",
Review Comment:
And maybe a comment on what they represent (IIUC it's for
FastPrimitivesCoder?)
##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -255,6 +256,36 @@ def schema_field(
description=description)
+def _python_any_schema_pb2():
+ return schema_pb2.FieldType(
+ logical_type=schema_pb2.LogicalType(
+ urn=PYTHON_ANY_URN,
+ representation=schema_pb2.FieldType(
+ nullable=False,
+ row_type=schema_pb2.RowType(
+ schema=schema_pb2.Schema(
+ fields=[
+ schema_pb2.Field(
+ name="type_byte",
+ type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.BYTE,
nullable=False)),
+ schema_pb2.Field(
+ name="payload",
Review Comment:
Can we create static top-level variables for `"type_byte"` and `"payload"`
too?
##########
sdks/python/apache_beam/transforms/sql_test.py:
##########
@@ -149,6 +162,34 @@ def test_row(self):
| SqlTransform("SELECT a*a as s, LENGTH(b) AS c FROM PCOLLECTION"))
assert_that(out, equal_to([(1, 1), (4, 1), (100, 2)]))
+ @staticmethod
+ def recover_to_python_type(input):
+ fields = []
+ for field in input:
+ print(field)
Review Comment:
nit: cleanup
##########
sdks/python/apache_beam/transforms/sql_test.py:
##########
@@ -149,6 +162,34 @@ def test_row(self):
| SqlTransform("SELECT a*a as s, LENGTH(b) AS c FROM PCOLLECTION"))
assert_that(out, equal_to([(1, 1), (4, 1), (100, 2)]))
+ @staticmethod
+ def recover_to_python_type(input):
+ fields = []
+ for field in input:
+ print(field)
+ if hasattr(field, 'type_byte') and hasattr(field, 'payload'):
+ obj = coders.FastPrimitivesCoder().decode(
+ field.type_byte.to_bytes() + field.payload)
+ fields.append(obj)
+ else:
+ fields.append(field)
+ return tuple(fields)
Review Comment:
Should we make this utility public?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]