Hi,

I forgot about this subject and came by this thread lately so I tested again:

- what the new spark runner does (even in local mode) and I guess all the other runners do:

        - encodes PC1 with the user specified NullableCoder(BigEndianLongCoder) to be able to pass data over the network

        - encodes PC2 with the user specified VarlongCoder to be able to pass data over the network

        - union the 2 collections

        - decodes using the specified output coder NullableCoder(VarlongCoder)

=> there, when the output coder comes by an element encoded with VarlongCoder it fails with a compatibility exception because, as Robert said, coders are not compatible and elements encoded with them are not.

As a consequence I have some remarks:

=> The current spark runner (RDD) adds no control on the coders (at least for flatten), so the test passes only because it is local so there is no serialization happening. But such a pipeline with heterogeneous coders will fail on a real cluster just like the above. For flink, an exception will be raised saying that input coders are different so it relies on the user to fix his pipeline.

=> I agree with Robert that such a problem should be dealt with at the SDK/user side. The runner should not change the user specified output coder to one compatible with the input coders or change the user specified input coders to one compatible with the output coder.  I think this problem is not only for flatten but for all transforms as a given pcollection could be set a coder that is not compatible with the coder of the previous step of the pipeline.

=> As a consequence, I propose to let the CoderException be raised and expect the exception in the test. But for local spark RDD runner used in test, that will not try to serialize, the exception will not be throw. For local Direct runner it will neither because it does not serialize either and it seems enforceEncodability parameter does not lead to an exception in that case.

WDYT?

Etienne

On 20/12/2019 19:40, Robert Bradshaw wrote:
The problem here is that T and Nullable<T> are two different types,
but are not distinguished as such in the Java type system (and hence
are treated interchangeably there), modulo certain cases where one can
use a @Nullable annotation). They also have incompatible encodings.

In my view, it is the SDKs job to ensure the output type (and
corresponding coder) of a Flatten is a least upper bound of those of
its inputs. It could do this by being restrictive on the Coders of its
inputs (e.g. requiring them to be equal), being intelligent (actually
able to compute least upper bounds), or placing the burden on the user
(e.g. requiring the user to specify it, possibly only when there is
ambiguity about what coder to choose).

On the other hand, I don't think that in the portability protos we
should require that all coders to a flatten be equal, only that the
output coder be sufficiently powerful to encode the union of all
possible elements (equivalently, it is a valid graph transformation to
swap out all input coder for the given output coder). Runners should
and must do recodings as necessary depending on their flatten
implementations (which often will be as simple requesting the a
consistent coding on the output ports, but may involve injecting
identity operations to do (possibly virtual) recoding in some cases,
e.g.

pc1[int]
     \
       Flatten -- pc12[int]
     /
pc2[int]
     \
       Flatten -- pc23[Optional[int]]
     /
pc3[Optional[int]]


On Thu, Dec 19, 2019 at 3:09 PM Luke Cwik <lc...@google.com> wrote:
I'm pretty sure that Flatten with different coders is well defined.
input: List<PCollection<T>>
output: PCollection<T>

When flatten is executed using T vs encoded(T), transcoding can be optimized because the coder 
for the output PCollection<T> is assumed to be able to encode all T's. The DirectRunner 
specifically does this transcoding check on elements to help pipeline authors catch this kind 
of error. Alternatively an SDK could require a method like "bool canEncode(T)" on 
coders which could be very cheap to ensure that values could be transcoded (this would work for 
many but not all value types). When the execution is occurring on encoded(T), then the bytes 
need to be transcoded somehow since the downstream transform is expected to get an encoding 
compatible with output PCollections encoding.

For the example that flattens Nullable<Long> and Long would be valid since the 
output PCollection accepts all the supported input types.

I believe all runners need to transcode if they are operating on encoded(T) 
when the input PCollection coder is not the same as the output PCollection 
coder. If they are operating on T's, then it's optional since its a choice 
between performance and debuggability.


On Wed, Dec 11, 2019 at 3:47 AM Etienne Chauchot <echauc...@apache.org> wrote:
Ok,

Thanks Kenn.

Le Flatten javadoc says that by default the coder of the output should be the 
coder of the first input. But in the test, it sets the output coder to 
something different. Waiting for a consensus on this model point and a common 
impl in the runners, I'll just exclude this test as other runner do.

Etienne

On 11/12/2019 04:46, Kenneth Knowles wrote:

It is a good point. Nullable(VarLong) and VarLong are two different types, with least 
upper bound that is Nullable(VarLong). BigEndianLong and VarLong are two different types, 
with no least upper bound in the "coders" type system. Yet we understand that 
the values they encode are equal. I do not think this is clearly formalized anywhere what 
the rules are (corollary: not thought carefully about).

I think both possibilities are reasonable:

1. Make the rule that Flatten only accepts inputs with identical coders. This will be 
sometimes annoying, requiring vacuous "re-encode" noop ParDos (they will be 
fused away on maybe all runners).
2. Define types as the domain of values, and Flatten accepts sets of PCollections with 
the same domain of values. Runners must "do whatever it takes" to respect the 
coders on the collection.
2a. For very simple cases, Flatten takes the least upper bound of the input 
types. The output coder of Flatten has to be this least upper bound. For 
example, a non-nullable output coder would be an error.

Very interesting and nuanced problem. Flatten just became quite an interesting 
transform, for me :-)

Kenn

On Tue, Dec 10, 2019 at 12:37 AM Etienne Chauchot <echauc...@apache.org> wrote:
Hi all,

I have an interrogation around testFlattenMultipleCoders test:

This test uses 2 collections

1. long and null data encoded using NullableCoder(BigEndianLongCoder)

2. long data encoded using VarlongCoder

It then flattens the 2 collections and set the coder of the resulting
collection to NullableCoder(VarlongCoder)

Most runners translate flatten as a simple union of the 2 PCollections
without any re-encoding. As a result all the runners exclude this test
from the test set because of coders issues. For example flink raises an
exception if the type of elements in PCollection1 is different of the
type of PCollection2 in flatten translation. Another example is direct
runner and spark (RDD based) runner that do not exclude this test simply
because they don't need to serialize elements so they don't even call
the coders.

That means that having an output PCollection of the flatten with
heterogeneous coders is not really tested so it is not really supported.

Should we drop this test case (that is executed by no runner) or should
we force each runner to re-encode ?

Best

Etienne



Reply via email to