I believe we should be making this easy for users versus disabling
something that we as runner authors find a nuisance and I believe the
runners should do what is necessary so that the output encoding is
respected based upon how they choose to execute a flatten.

I agree with Robert's point that the output coder should be at least the
upper bound of all the inputs and it is upto the SDK/user to guarantee this
but this still allows for the input coders to be different from the output
coder and the runner should perform any necessary graph
rewriting/transcoding it deems necessary to make sure that any encoded
representations can be decoded successfully.

On Wed, Jun 17, 2020 at 7:48 AM Etienne Chauchot <echauc...@apache.org>
wrote:

> Hi,
>
> I forgot about this subject and came by this thread lately so I tested
> again:
>
> - what the new spark runner does (even in local mode) and I guess all
> the other runners do:
>
>          - encodes PC1 with the user specified
> NullableCoder(BigEndianLongCoder) to be able to pass data over the network
>
>          - encodes PC2 with the user specified VarlongCoder to be able
> to pass data over the network
>
>          - union the 2 collections
>
>          - decodes using the specified output coder
> NullableCoder(VarlongCoder)
>
> => there, when the output coder comes by an element encoded with
> VarlongCoder it fails with a compatibility exception because, as Robert
> said, coders are not compatible and elements encoded with them are not.
>
> As a consequence I have some remarks:
>
> => The current spark runner (RDD) adds no control on the coders (at
> least for flatten), so the test passes only because it is local so there
> is no serialization happening. But such a pipeline with heterogeneous
> coders will fail on a real cluster just like the above. For flink, an
> exception will be raised saying that input coders are different so it
> relies on the user to fix his pipeline.
>
> => I agree with Robert that such a problem should be dealt with at the
> SDK/user side. The runner should not change the user specified output
> coder to one compatible with the input coders or change the user
> specified input coders to one compatible with the output coder.  I think
> this problem is not only for flatten but for all transforms as a given
> pcollection could be set a coder that is not compatible with the coder
> of the previous step of the pipeline.
>
> => As a consequence, I propose to let the CoderException be raised and
> expect the exception in the test. But for local spark RDD runner used in
> test, that will not try to serialize, the exception will not be throw.
> For local Direct runner it will neither because it does not serialize
> either and it seems enforceEncodability parameter does not lead to an
> exception in that case.
>
> WDYT?
>
> Etienne
>
> On 20/12/2019 19:40, Robert Bradshaw wrote:
> > The problem here is that T and Nullable<T> are two different types,
> > but are not distinguished as such in the Java type system (and hence
> > are treated interchangeably there), modulo certain cases where one can
> > use a @Nullable annotation). They also have incompatible encodings.
> >
> > In my view, it is the SDKs job to ensure the output type (and
> > corresponding coder) of a Flatten is a least upper bound of those of
> > its inputs. It could do this by being restrictive on the Coders of its
> > inputs (e.g. requiring them to be equal), being intelligent (actually
> > able to compute least upper bounds), or placing the burden on the user
> > (e.g. requiring the user to specify it, possibly only when there is
> > ambiguity about what coder to choose).
> >
> > On the other hand, I don't think that in the portability protos we
> > should require that all coders to a flatten be equal, only that the
> > output coder be sufficiently powerful to encode the union of all
> > possible elements (equivalently, it is a valid graph transformation to
> > swap out all input coder for the given output coder). Runners should
> > and must do recodings as necessary depending on their flatten
> > implementations (which often will be as simple requesting the a
> > consistent coding on the output ports, but may involve injecting
> > identity operations to do (possibly virtual) recoding in some cases,
> > e.g.
> >
> > pc1[int]
> >      \
> >        Flatten -- pc12[int]
> >      /
> > pc2[int]
> >      \
> >        Flatten -- pc23[Optional[int]]
> >      /
> > pc3[Optional[int]]
> >
> >
> > On Thu, Dec 19, 2019 at 3:09 PM Luke Cwik <lc...@google.com> wrote:
> >> I'm pretty sure that Flatten with different coders is well defined.
> >> input: List<PCollection<T>>
> >> output: PCollection<T>
> >>
> >> When flatten is executed using T vs encoded(T), transcoding can be
> optimized because the coder for the output PCollection<T> is assumed to be
> able to encode all T's. The DirectRunner specifically does this transcoding
> check on elements to help pipeline authors catch this kind of error.
> Alternatively an SDK could require a method like "bool canEncode(T)" on
> coders which could be very cheap to ensure that values could be transcoded
> (this would work for many but not all value types). When the execution is
> occurring on encoded(T), then the bytes need to be transcoded somehow since
> the downstream transform is expected to get an encoding compatible with
> output PCollections encoding.
> >>
> >> For the example that flattens Nullable<Long> and Long would be valid
> since the output PCollection accepts all the supported input types.
> >>
> >> I believe all runners need to transcode if they are operating on
> encoded(T) when the input PCollection coder is not the same as the output
> PCollection coder. If they are operating on T's, then it's optional since
> its a choice between performance and debuggability.
> >>
> >>
> >> On Wed, Dec 11, 2019 at 3:47 AM Etienne Chauchot <echauc...@apache.org>
> wrote:
> >>> Ok,
> >>>
> >>> Thanks Kenn.
> >>>
> >>> Le Flatten javadoc says that by default the coder of the output should
> be the coder of the first input. But in the test, it sets the output coder
> to something different. Waiting for a consensus on this model point and a
> common impl in the runners, I'll just exclude this test as other runner do.
> >>>
> >>> Etienne
> >>>
> >>> On 11/12/2019 04:46, Kenneth Knowles wrote:
> >>>
> >>> It is a good point. Nullable(VarLong) and VarLong are two different
> types, with least upper bound that is Nullable(VarLong). BigEndianLong and
> VarLong are two different types, with no least upper bound in the "coders"
> type system. Yet we understand that the values they encode are equal. I do
> not think this is clearly formalized anywhere what the rules are
> (corollary: not thought carefully about).
> >>>
> >>> I think both possibilities are reasonable:
> >>>
> >>> 1. Make the rule that Flatten only accepts inputs with identical
> coders. This will be sometimes annoying, requiring vacuous "re-encode" noop
> ParDos (they will be fused away on maybe all runners).
> >>> 2. Define types as the domain of values, and Flatten accepts sets of
> PCollections with the same domain of values. Runners must "do whatever it
> takes" to respect the coders on the collection.
> >>> 2a. For very simple cases, Flatten takes the least upper bound of the
> input types. The output coder of Flatten has to be this least upper bound.
> For example, a non-nullable output coder would be an error.
> >>>
> >>> Very interesting and nuanced problem. Flatten just became quite an
> interesting transform, for me :-)
> >>>
> >>> Kenn
> >>>
> >>> On Tue, Dec 10, 2019 at 12:37 AM Etienne Chauchot <
> echauc...@apache.org> wrote:
> >>>> Hi all,
> >>>>
> >>>> I have an interrogation around testFlattenMultipleCoders test:
> >>>>
> >>>> This test uses 2 collections
> >>>>
> >>>> 1. long and null data encoded using NullableCoder(BigEndianLongCoder)
> >>>>
> >>>> 2. long data encoded using VarlongCoder
> >>>>
> >>>> It then flattens the 2 collections and set the coder of the resulting
> >>>> collection to NullableCoder(VarlongCoder)
> >>>>
> >>>> Most runners translate flatten as a simple union of the 2 PCollections
> >>>> without any re-encoding. As a result all the runners exclude this test
> >>>> from the test set because of coders issues. For example flink raises
> an
> >>>> exception if the type of elements in PCollection1 is different of the
> >>>> type of PCollection2 in flatten translation. Another example is direct
> >>>> runner and spark (RDD based) runner that do not exclude this test
> simply
> >>>> because they don't need to serialize elements so they don't even call
> >>>> the coders.
> >>>>
> >>>> That means that having an output PCollection of the flatten with
> >>>> heterogeneous coders is not really tested so it is not really
> supported.
> >>>>
> >>>> Should we drop this test case (that is executed by no runner) or
> should
> >>>> we force each runner to re-encode ?
> >>>>
> >>>> Best
> >>>>
> >>>> Etienne
> >>>>
> >>>>
> >>>>
>

Reply via email to