Thanks, Brian. It makes sense, it wasn't entirely clear from commit
messages, that's why I wanted to double check.

On Tue, Sep 3, 2019 at 5:43 PM Brian Hulette <[email protected]> wrote:

> Hey Gleb, thanks for bringing this up. The PR that was reverted (8853) is
> the same one that I referred to earlier in this thread. It modified the
> existing portable schema representation to match what we settled on here -
> and yes it removed support for logical types like fixed bytes. I
> (foolishly) assumed the portable schema representation wasn't actually used
> anywhere yet, so I figured it would be a harmless regression until we were
> able to add support again with a logical type registry (BEAM-7855 [1]). But
> it turns out the proto representation is used to build cloud objects in the
> dataflow runner, and so the change led to issues in Dataflow [2]. Thus the
> reversion.
>
> I absolutely agree now we should finish BEAM-7855 first before making
> this change again. I'm also taking a look at some of the tests that should
> have caught this issue to see how it slipped through the cracks.
>
> Brian
>
> [1] https://issues.apache.org/jira/browse/BEAM-7855
> [2] https://issues.apache.org/jira/browse/BEAM-8111
>
> On Tue, Sep 3, 2019 at 7:27 AM Gleb Kanterov <[email protected]> wrote:
>
>> Recently there was a pull request (that was reverted) for adding portable
>> representation of schemas. It's great to see things moving forward, I'm
>> worried that it doesn't support any logical types, especially fixed bytes.
>> That makes runners using portable schemas unusable, for instance, when
>> reading certain Avro files, while it was possible before. I'm wondering if
>> it would be possible to include logical types into initial implementation
>> or add a feature flag to opt-in to portable schema representation.
>>
>> [1]: https://github.com/apache/beam/pull/8853
>>
>> Gleb
>>
>> On Wed, Jun 19, 2019 at 7:08 PM Kenneth Knowles <[email protected]> wrote:
>>
>>> Seems like a practical approach to get moving on things. Just to restate
>>> my understanding:
>>>
>>>  - in Java it is PCollection<T> but with the row coder holding
>>> to/from/clazz (I'm calling it row coder because row is the binary format,
>>> while schemas may have multiple possible formats)
>>>  - in portability, the URN + payload "is" the type, so it is
>>> PCollection<row, to, from, clazz>
>>>  - in Python it is interpreted as PCollection<row>
>>>
>>> We may eventually hit an issue caused by the asymmetry between
>>> portability, the authoring SDK (in this case Java), and the external SDK
>>> (in this case hypothetically Python). But perhaps the asymmetry is
>>> natural/harmless. We may eventually want it in the general form of the
>>> payload carrying identifying info of what the authoring SDK is.
>>>
>>> As to attaching conversions to operations rather than coders, I'm not so
>>> sure. It gets at the two natures of coders:
>>>
>>> 1. URN + payload fully specify how to interpret the bytes of an element
>>> 2. An SDK's implementation of the coder for a URN + payload is a
>>> contract with DoFns authored in that SDK what SDK-specific type they will
>>> receive
>>>
>>> The idea of storing the to/from conversions on operations themselves
>>> would be a sort of decoupling of 1 and 2. Assuming we design something that
>>> still allows eliding conversions, I expect that will look nearly identical.
>>> Coders are representation to/from Bytes, while this conversion layer is T
>>> to/from representation. So I might think we can do it without adding
>>> anything to the model.
>>>
>>> Kenn
>>>
>>> On Tue, Jun 18, 2019 at 11:04 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> Robert, you are correct that in principle the to/from functions are
>>>> needed on the operation, as that's where automatic conversion happens (in
>>>> Java it happens in DoFnRunner). However there are two blockers there:
>>>>
>>>> 1. As Brian mentioned, the issue in Java is that we never have
>>>> PCollection<Row> in this case. The source PCollection will simply be
>>>> PCollection<T>, where T has a schema. The to/from functions are now
>>>> required to interpret this PCollection. Currently we need to put it on the
>>>> PCollection itself to may Java's type system happy (an alternative is to
>>>> always create an intermediate PCollection<Row>, but that would be
>>>> computationally expensive). We might be able to find a way to model this in
>>>> Java with the to/from on the operation, however I suspect it would be
>>>> difficult and a lot of work.
>>>>
>>>> 2. I believe there are some cases where PTransforms access the to/from
>>>> functions in expand(), which is before we have an operation to attach the
>>>> those functions to. Again this is presumably solvable, but would require
>>>> design and more work.
>>>>
>>>> 3. Currently the user can call setSchema on any PCollection, and pass
>>>> in to/from functions there. We would have to rethink this API.
>>>>
>>>> So I think leaving it in the coder is the pragmatic approach for now,
>>>> though it would be interesting to see if we could solve the above issues
>>>> and instead automatically propagate the functions to the operation.
>>>>
>>>> I agree that we should not make these things opaque in the portable
>>>> representation, if only for ease of debugging. However they should not be
>>>> needed for cross-language calls.
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Jun 18, 2019 at 5:09 AM Robert Bradshaw <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks for updating that alternative.
>>>>>
>>>>> As for the to/from functions, it does seem pragmatic to dangle them
>>>>> off the purely portable representation (either as a field there, or as
>>>>> an opaque logical type whose payload contains the to/from functions,
>>>>> or a separate coder that wraps the schema coder (though I can't see
>>>>> how the latter would work well if nesting is allowed)) until we figure
>>>>> out a good way to attach them to the operations themselves.
>>>>>
>>>>> On Tue, Jun 18, 2019 at 2:37 AM Brian Hulette <[email protected]>
>>>>> wrote:
>>>>> >
>>>>> > Realized I completely ignored one of your points, added another
>>>>> response inline.
>>>>> >
>>>>> > On Fri, Jun 14, 2019 at 2:20 AM Robert Bradshaw <[email protected]>
>>>>> wrote:
>>>>> >>
>>>>> >> On Thu, Jun 13, 2019 at 8:42 PM Reuven Lax <[email protected]>
>>>>> wrote:
>>>>> >> >
>>>>> >> > Spoke to Brian about his proposal. It is essentially this:
>>>>> >> >
>>>>> >> > We create PortableSchemaCoder, with a well-known URN. This coder
>>>>> is parameterized by the schema (i.e. list of field name -> field type
>>>>> pairs).
>>>>> >>
>>>>> >> Given that we have a field type that is (list of field names ->
>>>>> field
>>>>> >> type pairs), is there a reason to do this enumeration at the top
>>>>> level
>>>>> >> as well? This would likely also eliminate some of the strangeness
>>>>> >> where we want to treat a PCollection with a single-field row as a
>>>>> >> PCollection with just that value instead.
>>>>> >
>>>>> >
>>>>> > This is part of what I was suggesting in my "Root schema is a
>>>>> logical type" alternative [1], except that the language about SDK-specific
>>>>> logical types is now obsolete. I'll update it to better reflect this
>>>>> alternative.
>>>>> > I do think at the very least we should just have one (list of field
>>>>> names -> field type pairs) that is re-used, which is what I did in my PR
>>>>> [2].
>>>>> >
>>>>> > [1]
>>>>> https://docs.google.com/document/d/1uu9pJktzT_O3DxGd1-Q2op4nRk4HekIZbzi-0oTAips/edit#heading=h.7570feur1qin
>>>>> > [2]
>>>>> https://github.com/apache/beam/pull/8853/files#diff-f0d64c2cfc4583bfe2a7e5ee59818ae2L686
>>>>> >
>>>>> >>
>>>>> >>
>>>>> >> > Java also continues to have its own CustomSchemaCoder. This is
>>>>> parameterized by the schema as well as the to/from functions needed to 
>>>>> make
>>>>> the Java API "nice."
>>>>> >> >
>>>>> >> > When the expansion service expands a Java PTransform for usage
>>>>> across languages, it will add a transform mapping the  PCollection with
>>>>> CustomSchemaCoder to a PCollection which has PortableSchemaCoder. This way
>>>>> Java can maintain the information needed to maintain its API (and Python
>>>>> can do the same), but there's no need to shove this information into the
>>>>> well-known portable representation.
>>>>> >> >
>>>>> >> > Brian, can you confirm that this was your proposal? If so, I like
>>>>> it.
>>>>> >>
>>>>> >> The major downside of this that I see is that it assumes that
>>>>> >> transparency is only needed at certain "boundaries" and everything
>>>>> >> between these boundaries is opaque. I think we'd be better served
>>>>> by a
>>>>> >> format where schemas are transparently represented throughout. For
>>>>> >> example, the "boundaries" between runner and SDK are not known at
>>>>> >> pipeline construction time, and we want the runner <-> SDK
>>>>> >> communication to understand the schemas to be able to use more
>>>>> >> efficient transport mechanisms (e.g. batches of arrow records). It
>>>>> may
>>>>> >> also be common for a pipeline in language X to invoke two transforms
>>>>> >> in language Y in succession (e.g. two SQL statements) in which case
>>>>> >> introducing two extra transforms in the expansion service would be
>>>>> >> wasteful. I also think we want to allow the flexibility for runners
>>>>> to
>>>>> >> swap out transforms an optimizations regardless of construction-time
>>>>> >> boundaries (e.g. implementing a projection natively, rather than
>>>>> >> outsourcing to the SDK).
>>>>> >>
>>>>> >> Are the to/from conversion functions the only extra information
>>>>> needed
>>>>> >> to make the Java APIs nice? If so, can they be attached to the
>>>>> >> operations themselves (where it seems they're actually needed/used),
>>>>> >> rather than to the schema/coder of the PCollection? Alternatively,
>>>>> I'd
>>>>> >> prefer this be opaque metadata attached to a transparent schema
>>>>> rather
>>>>> >> than making the whole schema opaque.
>>>>> >>
>>>>> >> > We've gone back and forth discussing abstracts for over a month
>>>>> now. I suggest that the next step should be to create a PR, and move
>>>>> discussion to that PR. Having actual code can often make discussion much
>>>>> more concrete.
>>>>> >>
>>>>> >> +1 to a PR, though I feel like there are fundamental high-level
>>>>> issues
>>>>> >> that are still not decided. (I suppose we should be open to throwing
>>>>> >> whole PRs away in that case.) There are certainly pieces that we'll
>>>>> >> know that we need (like the ability to serialize a row consistently
>>>>> in
>>>>> >> all languages) we can get in immediately.
>>>>> >>
>>>>> >> > Reuven
>>>>> >> >
>>>>> >> > On Thu, Jun 13, 2019 at 6:28 AM Robert Bradshaw <
>>>>> [email protected]> wrote:
>>>>> >> >>
>>>>> >> >> On Thu, Jun 13, 2019 at 5:47 AM Reuven Lax <[email protected]>
>>>>> wrote:
>>>>> >> >>>
>>>>> >> >>>
>>>>> >> >>> On Wed, Jun 12, 2019 at 8:29 PM Kenneth Knowles <
>>>>> [email protected]> wrote:
>>>>> >> >>>>
>>>>> >> >>>> Can we choose a first step? I feel there's consensus around:
>>>>> >> >>>>
>>>>> >> >>>>  - the basic idea of what a schema looks like, ignoring
>>>>> logical types or SDK-specific bits
>>>>> >> >>>>  - the version of logical type which is a standardized
>>>>> URN+payload plus a representation
>>>>> >> >>>>
>>>>> >> >>>> Perhaps we could commit this and see what it looks like to try
>>>>> to use it?
>>>>> >> >>
>>>>> >> >>
>>>>> >> >> +1
>>>>> >> >>
>>>>> >> >>>>
>>>>> >> >>>> It also seems like there might be consensus around the idea of
>>>>> each of:
>>>>> >> >>>>
>>>>> >> >>>>  - a coder that simply encodes rows; its payload is just a
>>>>> schema; it is minimalist, canonical
>>>>> >> >>>>
>>>>> >> >>>>  - a coder that encodes a non-row using the serialization
>>>>> format of a row; this has to be a coder (versus Convert transforms) so 
>>>>> that
>>>>> to/from row conversions can be elided when primitives are fused (just like
>>>>> to/from bytes is elided)
>>>>> >> >>
>>>>> >> >>
>>>>> >> >> So, to make it concrete, in the Beam protos we would have an
>>>>> [Elementwise]SchemaCoder whose single parameterization would be FieldType,
>>>>> whose definition is in terms of URN + payload + components (+
>>>>> representation, for non-primitive types, some details TBD there). It could
>>>>> be deserialized into various different Coder instances (an SDK
>>>>> implementation detail) in an SDK depending on the type. One of the most
>>>>> important primitive field types is Row (aka Struct).
>>>>> >> >>
>>>>> >> >> We would define a byte encoding for each primitive type. We
>>>>> *could* choose to simply require that the encoding of any non-row 
>>>>> primitive
>>>>> is the same as its encoding in a single-member row, but that's not
>>>>> necessary.
>>>>> >> >>
>>>>> >> >> In the short term, the window/timestamp/pane info would still
>>>>> live outside via an enclosing WindowCoder, as it does now, not blocking on
>>>>> a desirable but still-to-be-figured-out unification at that level.
>>>>> >> >>
>>>>> >> >> This seems like a good path forward.
>>>>> >> >>
>>>>> >> >>> Actually this doesn't make sense to me. I think from the
>>>>> portability perspective, all we have is schemas - the rest is just a
>>>>> convenience for the SDK. As such, I don't think it makes sense at all to
>>>>> model this as a Coder.
>>>>> >> >>
>>>>> >> >>
>>>>> >> >> Coder and Schemas are mutually exclusive on PCollections, and
>>>>> completely specify type information, so I think it makes sense to reuse
>>>>> this (as we're currently doing) until we can get rid of coders altogether.
>>>>> >> >>
>>>>> >> >> (At execution time, we would generalize the notion of a coder to
>>>>> indicate how *batches* of elements are encoded, not just how individual
>>>>> elements are encoded. Here we have the option of letting the runner pick
>>>>> depending on the use (e.g. elementwise for key lookups vs. arrow for bulk
>>>>> data channel transfer vs ???, possibly with parameters like "preferred
>>>>> batch size") or standardizing on one physical byte representation for all
>>>>> communication over the boundary.)
>>>>> >> >>
>>>>> >> >>>
>>>>> >> >>>
>>>>> >> >>>>
>>>>> >> >>>>
>>>>> >> >>>> Can we also just have both of these, with different URNs?
>>>>> >> >>>>
>>>>> >> >>>> Kenn
>>>>> >> >>>>
>>>>> >> >>>> On Wed, Jun 12, 2019 at 3:57 PM Reuven Lax <[email protected]>
>>>>> wrote:
>>>>> >> >>>>>
>>>>> >> >>>>>
>>>>> >> >>>>>
>>>>> >> >>>>> On Wed, Jun 12, 2019 at 3:46 PM Robert Bradshaw <
>>>>> [email protected]> wrote:
>>>>> >> >>>>>>
>>>>> >> >>>>>> On Tue, Jun 11, 2019 at 8:04 PM Kenneth Knowles <
>>>>> [email protected]> wrote:
>>>>> >> >>>>>>>
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> I believe the schema registry is a transient
>>>>> construction-time concept. I don't think there's any need for a concept of
>>>>> a registry in the portable representation.
>>>>> >> >>>>>>>
>>>>> >> >>>>>>>> I'd rather urn:beam:schema:logicaltype:javasdk not be used
>>>>> whenever one has (say) a Java POJO as that would prevent other SDKs from
>>>>> "understanding" it as above (unless we had a way of declaring it as "just
>>>>> an alias/wrapper").
>>>>> >> >>>>>>>
>>>>> >> >>>>>>>
>>>>> >> >>>>>>> I didn't understand the example I snipped, but I think I
>>>>> understand your concern here. Is this what you want? (a) something
>>>>> presented as a POJO in Java (b) encoded to a row, but still decoded to the
>>>>> POJO and (c) non-Java SDK knows that it is "just a struct" so it is safe 
>>>>> to
>>>>> mess about with or even create new ones. If this is what you want it seems
>>>>> potentially useful, but also easy to live without. This can also be done
>>>>> entirely within the Java SDK via conversions, leaving no logical type in
>>>>> the portable pipeline.
>>>>> >> >>>>>>
>>>>> >> >>>>>>
>>>>> >> >>>>>> I'm imaging a world where someone defines a PTransform that
>>>>> takes a POJO for a constructor, and consumes and produces a POJO, and is
>>>>> now usable from Go with no additional work on the PTransform author's
>>>>> part.  But maybe I'm thinking about this wrong and the POJO <-> Row
>>>>> conversion is part of the @ProcesssElement magic, not encoded in the 
>>>>> schema
>>>>> itself.
>>>>> >> >>>>>
>>>>> >> >>>>>
>>>>> >> >>>>> The user's output would have to be explicitly schema. They
>>>>> would somehow have to tell Beam the infer a schema from the output POJO
>>>>> (e.g. one way to do this is to annotate the POJO with the @DefaultSchema
>>>>> annotation).  We don't currently magically turn a POJO into a schema 
>>>>> unless
>>>>> we are asked to do so.
>>>>>
>>>>
>>
>> --
>> Cheers,
>> Gleb
>>
>

-- 
Cheers,
Gleb

Reply via email to