[ 
https://issues.apache.org/jira/browse/BEAM-8732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16980206#comment-16980206
 ] 

Brian Hulette commented on BEAM-8732:
-------------------------------------

I'm definitely fine with you taking this on, but I'm not sure that calling 
{{typing_to_runner_api()}} is all that needs to happen. Since that function 
generates a random UUID for the schema, I think it'll get a different UUID when 
executed at pipeline construction time than when executed at pipeline execution 
time (on the workers), so the workers won't be able to look up the correct 
class in the registry and will generate one instead. Perhaps one solution would 
be to make the UUID generation deterministic somehow? But I'm not sure how 
feasible that is.

In general the solution to this problem in the Java SDK has been to include 
serialized java classes in the pipeline graph, e.g. 
[SchemaCoder|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java]
 is currently just represented as a big serialized class including the user 
class and functions for converting to/from Row. We also currently allow for 
[java-specific logical 
types|https://github.com/apache/beam/blob/07d952f313477ee18cdc706100ba7e1810b1ef4f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java#L91-L102]
 which serialize a class for converting to/from user types.

cc: [~reuvenlax] who is more familiar with the Java side of this than me.

> Add support for additional structured types to Schemas/RowCoders
> ----------------------------------------------------------------
>
>                 Key: BEAM-8732
>                 URL: https://issues.apache.org/jira/browse/BEAM-8732
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Chad Dombrova
>            Priority: Major
>
> Currently we can convert between a {{NamedTuple}} type and its {{Schema}} 
> protos using {{named_tuple_from_schema}} and {{named_tuple_to_schema}}. I'd 
> like to introduce a system to support additional types, starting with 
> structured types like {{attrs}}, {{dataclasses}}, and {{TypedDict}}.
> I've only just started digesting the code, but this task seems pretty 
> straightforward. For example, I think the type-to-schema code would look 
> roughly like this:
> {code:python}
> def typing_to_runner_api(type_):
>   # type: (Type) -> schema_pb2.FieldType
>   structured_handler = _get_structured_handler(type_)
>   if structured_handler:
>     schema = None
>     if hasattr(type_, 'id'):
>       schema = SCHEMA_REGISTRY.get_schema_by_id(type_.id)
>     if schema is None:
>       fields = structured_handler.get_fields()
>       type_id = str(uuid4())
>       schema = schema_pb2.Schema(fields=fields, id=type_id)
>       SCHEMA_REGISTRY.add(type_, schema)
>     return schema_pb2.FieldType(
>         row_type=schema_pb2.RowType(
>             schema=schema))
> {code}
> The rest of the work would be in implementing a class hierarchy for working 
> with structured types, such as getting a list of fields from an instance, and 
> instantiation from a list of fields. Eventually we can extend this behavior 
> to arbitrary, unstructured types.  
> Going in the schema-to-type direction, we have the problem of choosing which 
> type to use for a given schema. I believe that as long as 
> {{typing_to_runner_api()}} has been called on our structured type in the 
> current python session, it should be added to the registry and thus round 
> trip ok, so I think we just need a public function for registering schemas 
> for structured types.
> [~bhulette] Did you want to tackle this or are you ok with me going after it?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to