TheNeuralBit commented on a change in pull request #11901:
URL: https://github.com/apache/beam/pull/11901#discussion_r437536115



##########
File path: sdks/python/apache_beam/transforms/sql.py
##########
@@ -74,3 +77,8 @@ def __init__(self, query, dialect=None):
             SqlTransformSchema(query=query, dialect=dialect)),
         BeamJarExpansionService(
             ':sdks:java:extensions:sql:expansion-service:shadowJar'))
+
+
+class Row(object):
+  def __init__(self, **kwargs):
+    self.__dict__.update(kwargs)

Review comment:
       Mm nothing comes to mind. I suppose it could just be `apache_beam.Row` 
for now, and we can alias it if we add a schema package with other top-level 
schema stuff later.

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -82,8 +86,19 @@ def from_runner_api_parameter(schema, components, 
unused_context):
     return RowCoder(schema)
 
   @staticmethod
-  def from_type_hint(named_tuple_type, registry):
-    return RowCoder(named_tuple_to_schema(named_tuple_type))
+  def from_type_hint(type_hint, registry):
+    if isinstance(type_hint, row_type.RowTypeConstraint):
+      schema = schema_pb2.Schema(
+          fields=[
+              schema_pb2.Field(
+                  name=name,
+                  type=typing_to_runner_api(type))
+              for (name, type) in type_hint._fields
+          ],
+          id=str(uuid.uuid4()))

Review comment:
       Could you move this inference to `typehints.schemas` alongside 
`named_tuple_to_schema`? I have a WIP PR for batching schema'd PCollections 
that are inputs to Dataframes and I should re-use this logic there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to