I believe we should be making this easy for users versus disabling something that we as runner authors find a nuisance and I believe the runners should do what is necessary so that the output encoding is respected based upon how they choose to execute a flatten.
I agree with Robert's point that the output coder should be at least the upper bound of all the inputs and it is upto the SDK/user to guarantee this but this still allows for the input coders to be different from the output coder and the runner should perform any necessary graph rewriting/transcoding it deems necessary to make sure that any encoded representations can be decoded successfully. On Wed, Jun 17, 2020 at 7:48 AM Etienne Chauchot <echauc...@apache.org> wrote: > Hi, > > I forgot about this subject and came by this thread lately so I tested > again: > > - what the new spark runner does (even in local mode) and I guess all > the other runners do: > > - encodes PC1 with the user specified > NullableCoder(BigEndianLongCoder) to be able to pass data over the network > > - encodes PC2 with the user specified VarlongCoder to be able > to pass data over the network > > - union the 2 collections > > - decodes using the specified output coder > NullableCoder(VarlongCoder) > > => there, when the output coder comes by an element encoded with > VarlongCoder it fails with a compatibility exception because, as Robert > said, coders are not compatible and elements encoded with them are not. > > As a consequence I have some remarks: > > => The current spark runner (RDD) adds no control on the coders (at > least for flatten), so the test passes only because it is local so there > is no serialization happening. But such a pipeline with heterogeneous > coders will fail on a real cluster just like the above. For flink, an > exception will be raised saying that input coders are different so it > relies on the user to fix his pipeline. > > => I agree with Robert that such a problem should be dealt with at the > SDK/user side. The runner should not change the user specified output > coder to one compatible with the input coders or change the user > specified input coders to one compatible with the output coder. I think > this problem is not only for flatten but for all transforms as a given > pcollection could be set a coder that is not compatible with the coder > of the previous step of the pipeline. > > => As a consequence, I propose to let the CoderException be raised and > expect the exception in the test. But for local spark RDD runner used in > test, that will not try to serialize, the exception will not be throw. > For local Direct runner it will neither because it does not serialize > either and it seems enforceEncodability parameter does not lead to an > exception in that case. > > WDYT? > > Etienne > > On 20/12/2019 19:40, Robert Bradshaw wrote: > > The problem here is that T and Nullable<T> are two different types, > > but are not distinguished as such in the Java type system (and hence > > are treated interchangeably there), modulo certain cases where one can > > use a @Nullable annotation). They also have incompatible encodings. > > > > In my view, it is the SDKs job to ensure the output type (and > > corresponding coder) of a Flatten is a least upper bound of those of > > its inputs. It could do this by being restrictive on the Coders of its > > inputs (e.g. requiring them to be equal), being intelligent (actually > > able to compute least upper bounds), or placing the burden on the user > > (e.g. requiring the user to specify it, possibly only when there is > > ambiguity about what coder to choose). > > > > On the other hand, I don't think that in the portability protos we > > should require that all coders to a flatten be equal, only that the > > output coder be sufficiently powerful to encode the union of all > > possible elements (equivalently, it is a valid graph transformation to > > swap out all input coder for the given output coder). Runners should > > and must do recodings as necessary depending on their flatten > > implementations (which often will be as simple requesting the a > > consistent coding on the output ports, but may involve injecting > > identity operations to do (possibly virtual) recoding in some cases, > > e.g. > > > > pc1[int] > > \ > > Flatten -- pc12[int] > > / > > pc2[int] > > \ > > Flatten -- pc23[Optional[int]] > > / > > pc3[Optional[int]] > > > > > > On Thu, Dec 19, 2019 at 3:09 PM Luke Cwik <lc...@google.com> wrote: > >> I'm pretty sure that Flatten with different coders is well defined. > >> input: List<PCollection<T>> > >> output: PCollection<T> > >> > >> When flatten is executed using T vs encoded(T), transcoding can be > optimized because the coder for the output PCollection<T> is assumed to be > able to encode all T's. The DirectRunner specifically does this transcoding > check on elements to help pipeline authors catch this kind of error. > Alternatively an SDK could require a method like "bool canEncode(T)" on > coders which could be very cheap to ensure that values could be transcoded > (this would work for many but not all value types). When the execution is > occurring on encoded(T), then the bytes need to be transcoded somehow since > the downstream transform is expected to get an encoding compatible with > output PCollections encoding. > >> > >> For the example that flattens Nullable<Long> and Long would be valid > since the output PCollection accepts all the supported input types. > >> > >> I believe all runners need to transcode if they are operating on > encoded(T) when the input PCollection coder is not the same as the output > PCollection coder. If they are operating on T's, then it's optional since > its a choice between performance and debuggability. > >> > >> > >> On Wed, Dec 11, 2019 at 3:47 AM Etienne Chauchot <echauc...@apache.org> > wrote: > >>> Ok, > >>> > >>> Thanks Kenn. > >>> > >>> Le Flatten javadoc says that by default the coder of the output should > be the coder of the first input. But in the test, it sets the output coder > to something different. Waiting for a consensus on this model point and a > common impl in the runners, I'll just exclude this test as other runner do. > >>> > >>> Etienne > >>> > >>> On 11/12/2019 04:46, Kenneth Knowles wrote: > >>> > >>> It is a good point. Nullable(VarLong) and VarLong are two different > types, with least upper bound that is Nullable(VarLong). BigEndianLong and > VarLong are two different types, with no least upper bound in the "coders" > type system. Yet we understand that the values they encode are equal. I do > not think this is clearly formalized anywhere what the rules are > (corollary: not thought carefully about). > >>> > >>> I think both possibilities are reasonable: > >>> > >>> 1. Make the rule that Flatten only accepts inputs with identical > coders. This will be sometimes annoying, requiring vacuous "re-encode" noop > ParDos (they will be fused away on maybe all runners). > >>> 2. Define types as the domain of values, and Flatten accepts sets of > PCollections with the same domain of values. Runners must "do whatever it > takes" to respect the coders on the collection. > >>> 2a. For very simple cases, Flatten takes the least upper bound of the > input types. The output coder of Flatten has to be this least upper bound. > For example, a non-nullable output coder would be an error. > >>> > >>> Very interesting and nuanced problem. Flatten just became quite an > interesting transform, for me :-) > >>> > >>> Kenn > >>> > >>> On Tue, Dec 10, 2019 at 12:37 AM Etienne Chauchot < > echauc...@apache.org> wrote: > >>>> Hi all, > >>>> > >>>> I have an interrogation around testFlattenMultipleCoders test: > >>>> > >>>> This test uses 2 collections > >>>> > >>>> 1. long and null data encoded using NullableCoder(BigEndianLongCoder) > >>>> > >>>> 2. long data encoded using VarlongCoder > >>>> > >>>> It then flattens the 2 collections and set the coder of the resulting > >>>> collection to NullableCoder(VarlongCoder) > >>>> > >>>> Most runners translate flatten as a simple union of the 2 PCollections > >>>> without any re-encoding. As a result all the runners exclude this test > >>>> from the test set because of coders issues. For example flink raises > an > >>>> exception if the type of elements in PCollection1 is different of the > >>>> type of PCollection2 in flatten translation. Another example is direct > >>>> runner and spark (RDD based) runner that do not exclude this test > simply > >>>> because they don't need to serialize elements so they don't even call > >>>> the coders. > >>>> > >>>> That means that having an output PCollection of the flatten with > >>>> heterogeneous coders is not really tested so it is not really > supported. > >>>> > >>>> Should we drop this test case (that is executed by no runner) or > should > >>>> we force each runner to re-encode ? > >>>> > >>>> Best > >>>> > >>>> Etienne > >>>> > >>>> > >>>> >