I'm pretty sure that Flatten with different coders is well defined. input: List<PCollection<T>> output: PCollection<T>
When flatten is executed using T vs encoded(T), transcoding can be optimized because the coder for the output PCollection<T> is assumed to be able to encode all T's. The DirectRunner specifically does this transcoding check on elements to help pipeline authors catch this kind of error. Alternatively an SDK could require a method like "bool canEncode(T)" on coders which could be very cheap to ensure that values could be transcoded (this would work for many but not all value types). When the execution is occurring on encoded(T), then the bytes need to be transcoded somehow since the downstream transform is expected to get an encoding compatible with output PCollections encoding. For the example that flattens Nullable<Long> and Long would be valid since the output PCollection accepts all the supported input types. I believe all runners need to transcode if they are operating on encoded(T) when the input PCollection coder is not the same as the output PCollection coder. If they are operating on T's, then it's optional since its a choice between performance and debuggability. On Wed, Dec 11, 2019 at 3:47 AM Etienne Chauchot <echauc...@apache.org> wrote: > Ok, > > Thanks Kenn. > > Le Flatten javadoc says that by default the coder of the output should be > the coder of the first input. But in the test, it sets the output coder to > something different. Waiting for a consensus on this model point and a > common impl in the runners, I'll just exclude this test as other runner do. > > Etienne > On 11/12/2019 04:46, Kenneth Knowles wrote: > > It is a good point. Nullable(VarLong) and VarLong are two different types, > with least upper bound that is Nullable(VarLong). BigEndianLong and VarLong > are two different types, with no least upper bound in the "coders" type > system. Yet we understand that the values they encode are equal. I do not > think this is clearly formalized anywhere what the rules are (corollary: > not thought carefully about). > > I think both possibilities are reasonable: > > 1. Make the rule that Flatten only accepts inputs with identical coders. > This will be sometimes annoying, requiring vacuous "re-encode" noop ParDos > (they will be fused away on maybe all runners). > 2. Define types as the domain of values, and Flatten accepts sets of > PCollections with the same domain of values. Runners must "do whatever it > takes" to respect the coders on the collection. > 2a. For very simple cases, Flatten takes the least upper bound of the > input types. The output coder of Flatten has to be this least upper bound. > For example, a non-nullable output coder would be an error. > > Very interesting and nuanced problem. Flatten just became quite an > interesting transform, for me :-) > > Kenn > > On Tue, Dec 10, 2019 at 12:37 AM Etienne Chauchot <echauc...@apache.org> > wrote: > >> Hi all, >> >> I have an interrogation around testFlattenMultipleCoders test: >> >> This test uses 2 collections >> >> 1. long and null data encoded using NullableCoder(BigEndianLongCoder) >> >> 2. long data encoded using VarlongCoder >> >> It then flattens the 2 collections and set the coder of the resulting >> collection to NullableCoder(VarlongCoder) >> >> Most runners translate flatten as a simple union of the 2 PCollections >> without any re-encoding. As a result all the runners exclude this test >> from the test set because of coders issues. For example flink raises an >> exception if the type of elements in PCollection1 is different of the >> type of PCollection2 in flatten translation. Another example is direct >> runner and spark (RDD based) runner that do not exclude this test simply >> because they don't need to serialize elements so they don't even call >> the coders. >> >> That means that having an output PCollection of the flatten with >> heterogeneous coders is not really tested so it is not really supported. >> >> Should we drop this test case (that is executed by no runner) or should >> we force each runner to re-encode ? >> >> Best >> >> Etienne >> >> >> >>