On Fri, Jun 14, 2019 at 2:20 AM Robert Bradshaw <[email protected]> wrote:

> On Thu, Jun 13, 2019 at 8:42 PM Reuven Lax <[email protected]> wrote:
> >
> > Spoke to Brian about his proposal. It is essentially this:
> >
> > We create PortableSchemaCoder, with a well-known URN. This coder is
> parameterized by the schema (i.e. list of field name -> field type pairs).
>
> Given that we have a field type that is (list of field names -> field
> type pairs), is there a reason to do this enumeration at the top level
> as well? This would likely also eliminate some of the strangeness
> where we want to treat a PCollection with a single-field row as a
> PCollection with just that value instead.
>
> > Java also continues to have its own CustomSchemaCoder. This is
> parameterized by the schema as well as the to/from functions needed to make
> the Java API "nice."
> >
> > When the expansion service expands a Java PTransform for usage across
> languages, it will add a transform mapping the  PCollection with
> CustomSchemaCoder to a PCollection which has PortableSchemaCoder. This way
> Java can maintain the information needed to maintain its API (and Python
> can do the same), but there's no need to shove this information into the
> well-known portable representation.
> >
> > Brian, can you confirm that this was your proposal? If so, I like it.
>
> The major downside of this that I see is that it assumes that
> transparency is only needed at certain "boundaries" and everything
> between these boundaries is opaque. I think we'd be better served by a
> format where schemas are transparently represented throughout. For
> example, the "boundaries" between runner and SDK are not known at
> pipeline construction time, and we want the runner <-> SDK
> communication to understand the schemas to be able to use more
> efficient transport mechanisms (e.g. batches of arrow records). It may
> also be common for a pipeline in language X to invoke two transforms
> in language Y in succession (e.g. two SQL statements) in which case
> introducing two extra transforms in the expansion service would be
> wasteful. I also think we want to allow the flexibility for runners to
> swap out transforms an optimizations regardless of construction-time
> boundaries (e.g. implementing a projection natively, rather than
> outsourcing to the SDK).]
>

Agreed that ideally we would provide transparency everywhere, and not just
at certain boundaries.  What I had in mind was that the JavaSchemaCoder's
portable representation wouldn't be entirely opaque, it would somehow be
encoded as <portable schema> + <java specific to/from function> so that in
theory a runner could inspect the schema. I suppose if you let the runner
inspect that schema, that begs the question "why not let some other SDK do
that as well?" I guess I don't have a good answer for that. It just feels
wrong that a schema coder created by Java and consumed by Python would
include functions to convert *from* some java type, and also back *to* some
java type.


> Are the to/from conversion functions the only extra information needed
> to make the Java APIs nice? If so, can they be attached to the
> operations themselves (where it seems they're actually needed/used),
> rather than to the schema/coder of the PCollection? Alternatively, I'd
> prefer this be opaque metadata attached to a transparent schema rather
> than making the whole schema opaque.
>

Yes the to/from functions (and a serialized class) are all we need. I
agree, I think ideally these functions would be properties of the
operations. That makes much more sense for the Java -> Python case I cited
above. I've tried to think of how we could do that - but if we attach the
conversions to the operations, then we have a sequence of operations that
appear to operate on Rows, but internally use some Java user types. Then if
those operations are fused together, there would be an unnecessary
conversion to Row and back again, rather than just passing the user type
instances directly. Maybe there is some clever way to avoid that that I'm
not aware of?


>
> > We've gone back and forth discussing abstracts for over a month now. I
> suggest that the next step should be to create a PR, and move discussion to
> that PR. Having actual code can often make discussion much more concrete.
>
> +1 to a PR, though I feel like there are fundamental high-level issues
> that are still not decided. (I suppose we should be open to throwing
> whole PRs away in that case.) There are certainly pieces that we'll
> know that we need (like the ability to serialize a row consistently in
> all languages) we can get in immediately.
>
> > Reuven
> >
> > On Thu, Jun 13, 2019 at 6:28 AM Robert Bradshaw <[email protected]>
> wrote:
> >>
> >> On Thu, Jun 13, 2019 at 5:47 AM Reuven Lax <[email protected]> wrote:
> >>>
> >>>
> >>> On Wed, Jun 12, 2019 at 8:29 PM Kenneth Knowles <[email protected]>
> wrote:
> >>>>
> >>>> Can we choose a first step? I feel there's consensus around:
> >>>>
> >>>>  - the basic idea of what a schema looks like, ignoring logical types
> or SDK-specific bits
> >>>>  - the version of logical type which is a standardized URN+payload
> plus a representation
> >>>>
> >>>> Perhaps we could commit this and see what it looks like to try to use
> it?
> >>
> >>
> >> +1
> >>
> >>>>
> >>>> It also seems like there might be consensus around the idea of each
> of:
> >>>>
> >>>>  - a coder that simply encodes rows; its payload is just a schema; it
> is minimalist, canonical
> >>>>
> >>>>  - a coder that encodes a non-row using the serialization format of a
> row; this has to be a coder (versus Convert transforms) so that to/from row
> conversions can be elided when primitives are fused (just like to/from
> bytes is elided)
> >>
> >>
> >> So, to make it concrete, in the Beam protos we would have an
> [Elementwise]SchemaCoder whose single parameterization would be FieldType,
> whose definition is in terms of URN + payload + components (+
> representation, for non-primitive types, some details TBD there). It could
> be deserialized into various different Coder instances (an SDK
> implementation detail) in an SDK depending on the type. One of the most
> important primitive field types is Row (aka Struct).
> >>
> >> We would define a byte encoding for each primitive type. We *could*
> choose to simply require that the encoding of any non-row primitive is the
> same as its encoding in a single-member row, but that's not necessary.
> >>
> >> In the short term, the window/timestamp/pane info would still live
> outside via an enclosing WindowCoder, as it does now, not blocking on a
> desirable but still-to-be-figured-out unification at that level.
> >>
> >> This seems like a good path forward.
> >>
> >>> Actually this doesn't make sense to me. I think from the portability
> perspective, all we have is schemas - the rest is just a convenience for
> the SDK. As such, I don't think it makes sense at all to model this as a
> Coder.
> >>
> >>
> >> Coder and Schemas are mutually exclusive on PCollections, and
> completely specify type information, so I think it makes sense to reuse
> this (as we're currently doing) until we can get rid of coders altogether.
> >>
> >> (At execution time, we would generalize the notion of a coder to
> indicate how *batches* of elements are encoded, not just how individual
> elements are encoded. Here we have the option of letting the runner pick
> depending on the use (e.g. elementwise for key lookups vs. arrow for bulk
> data channel transfer vs ???, possibly with parameters like "preferred
> batch size") or standardizing on one physical byte representation for all
> communication over the boundary.)
> >>
> >>>
> >>>
> >>>>
> >>>>
> >>>> Can we also just have both of these, with different URNs?
> >>>>
> >>>> Kenn
> >>>>
> >>>> On Wed, Jun 12, 2019 at 3:57 PM Reuven Lax <[email protected]> wrote:
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Wed, Jun 12, 2019 at 3:46 PM Robert Bradshaw <[email protected]>
> wrote:
> >>>>>>
> >>>>>> On Tue, Jun 11, 2019 at 8:04 PM Kenneth Knowles <[email protected]>
> wrote:
> >>>>>>>
> >>>>>>>
> >>>>>>> I believe the schema registry is a transient construction-time
> concept. I don't think there's any need for a concept of a registry in the
> portable representation.
> >>>>>>>
> >>>>>>>> I'd rather urn:beam:schema:logicaltype:javasdk not be used
> whenever one has (say) a Java POJO as that would prevent other SDKs from
> "understanding" it as above (unless we had a way of declaring it as "just
> an alias/wrapper").
> >>>>>>>
> >>>>>>>
> >>>>>>> I didn't understand the example I snipped, but I think I
> understand your concern here. Is this what you want? (a) something
> presented as a POJO in Java (b) encoded to a row, but still decoded to the
> POJO and (c) non-Java SDK knows that it is "just a struct" so it is safe to
> mess about with or even create new ones. If this is what you want it seems
> potentially useful, but also easy to live without. This can also be done
> entirely within the Java SDK via conversions, leaving no logical type in
> the portable pipeline.
> >>>>>>
> >>>>>>
> >>>>>> I'm imaging a world where someone defines a PTransform that takes a
> POJO for a constructor, and consumes and produces a POJO, and is now usable
> from Go with no additional work on the PTransform author's part.  But maybe
> I'm thinking about this wrong and the POJO <-> Row conversion is part of
> the @ProcesssElement magic, not encoded in the schema itself.
> >>>>>
> >>>>>
> >>>>> The user's output would have to be explicitly schema. They would
> somehow have to tell Beam the infer a schema from the output POJO (e.g. one
> way to do this is to annotate the POJO with the @DefaultSchema
> annotation).  We don't currently magically turn a POJO into a schema unless
> we are asked to do so.
>

Reply via email to