TheNeuralBit commented on a change in pull request #15410:
URL: https://github.com/apache/beam/pull/15410#discussion_r730135297



##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -169,6 +170,7 @@ def __init__(self, schema, components):
         field.type.nullable for field in self.schema.fields)
 
   def encode_to_stream(self, value, out, nested):
+    self.schema = SCHEMA_REGISTRY.get_schema_by_id(self.schema.id)

Review comment:
       Is this necessary?

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -190,7 +193,10 @@ def encode_to_stream(self, value, out, nested):
               "Attempted to encode null for non-nullable field \"{}\".".format(
                   field.name))
         continue
-      c.encode_to_stream(attr, out, True)
+      attrs_enc_pos.append((c, field.encoding_position, attr))

Review comment:
       I think we should only need to read the encoding positions once, when 
constructing the coder. We don't need to inspect the encoding positions and 
sort for every element.
   
   Instead, this should read the encoding positions in `__init__`, and 
transform them into a list of indexes that will encode in the proper order 
(`numpy.argsort` should do this). For the case where encoding positions aren't 
specified the list could just be `list(range(len(self.schema.fields)))`.
   
   It may also make sense to validate the encoding positions when constructing 
the coder (they should all be unique values in the range `[0, 
len(self.schema.fields))`), to be safe.

##########
File path: sdks/python/apache_beam/typehints/schemas.py
##########
@@ -138,8 +138,9 @@ def named_fields_to_schema(names_and_types):
   # type: (Sequence[Tuple[str, type]]) -> schema_pb2.Schema
   return schema_pb2.Schema(
       fields=[
-          schema_pb2.Field(name=name, type=typing_to_runner_api(type))
-          for (name, type) in names_and_types
+          schema_pb2.Field(
+              name=name, type=typing_to_runner_api(type), 
encoding_position=idx)
+          for idx, (name, type) in enumerate(names_and_types)

Review comment:
       I think we should actually leave the encoding positions undefined when 
we create schemas in this file. The encoding position is something that the 
runner will manipulate to ensure update compatibility, the SDK doesn't need to 
worry about it.

##########
File path: sdks/go/test/regression/coders/fromyaml/fromyaml.go
##########
@@ -83,11 +88,13 @@ func (s *Spec) testStandardCoder() (err error) {
                log.Printf("skipping unimplemented coder urn: %v", s.Coder.Urn)
                return nil
        }
-       // TODO(BEAM-9615): Support Logical types, and produce a better error 
message.
-       if strings.Contains(s.Coder.Payload, "logical") {
-               log.Printf("skipping coder with logical type. Unsupported in 
the Go SDK for now. Payload: %v", s.Coder.Payload)
-               return nil
+       for _, c := range filteredCases {
+               if strings.Contains(s.Coder.Payload, c.filter) {
+                       log.Printf("skipping coder case. Unsupported in the Go 
SDK for now: %v Payload: %v", c.reason, s.Coder.Payload)
+                       return nil
+               }

Review comment:
       I have very limited knowledge of Go, but this LGTM and works. Does it 
look ok to you @lostluck?

##########
File path: 
model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -409,6 +409,26 @@ examples:
   
"\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f":
 {f_map: {"foo": 9001, "bar": 9223372036854775807}}
   
"\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00":
 {f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
 
+---
+# Binary data generated with the python SDK:
+# 
+# fields = [("foo", str), ("bar", str)]
+# schema = typing.NamedTuple( "test", fields)
+# coder = RowCoder.from_type_hint(schema, None)
+# examples = (
+#             coder.encode(schema(foo="str1",bar="str2")),
+#             coder.encode(schema(bar="str2",foo="str1"))
+#            )
+# print("schema = %s" % coder.schema.SerializeToString())

Review comment:
       You may need to use a different approach to generate test cases here. 
We'll want to have some test cases where the encoding position is different 
from the field order. In order to do that I think you'll need to manually 
construct a Schema proto object (as is done in some places in 
`schemas_test.py`), and make sure it has encoding positions that are out of 
order. Then you can serialize it with `SerializeToString()`. Does that make 
sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to