[ https://issues.apache.org/jira/browse/BEAM-1820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078407#comment-16078407 ]
Łukasz Gajowy commented on BEAM-1820: ------------------------------------- Below there's a list of PTransforms that assume Source.getDefaultOutputCoder() is not nullable: - UnboundedReadFromBoundedSource - StreamingBoundedRead - StreamingUnboundedRead - BoundedReadFromUnboundedSource - Read.Bounded - Read.Unbounded - StreamingUnboundedRead/ReadWithIds (DataflowRunner) Coder in first four PTransforms can be obtained during the expand() method call from a Read.from() performed there. When I modify it, this way, all the test pass. Below there's an example on how i do this (based on `UnboundedReadFromBoundedSource`) ``` private Coder<T> outputCoder; @Override public PCollection<T> expand(PBegin input) { PCollection<T> collection = input.getPipeline().apply( Read.from(new BoundedToUnboundedSourceAdapter<>(source))); outputCoder = collection.getCoder(); return collection; } @Override protected Coder<T> getDefaultOutputCoder() { return outputCoder; } ``` The other three PTransforms fail the tests when i try to make a similar change. This is due to the fact that the coder is not being set in the PCollection. Therefore I have the following questions that I would be glad if someone more experienced than me answers: 1. Is the way of obtaining the coder correct in the first four cases (this is how i understood the task)? Is the fact that the outputCoder variable remains null until the expand() method executes ok? 2. What about PTransforms in which the Read.from() in the expand() method does not set the Coder on PCollection? This is the reason tests fail in the last three cases i listed. 3. Also, there are places in Source classes (e.g. MicrobatchSource) that assume not null default output coder, but I think that correcting them is not a part of the task because it's the transform code that should worry about the Coders regarding to the issue description. Do I understand correctly? > Source.getDefaultOutputCoder() should be @Nullable > -------------------------------------------------- > > Key: BEAM-1820 > URL: https://issues.apache.org/jira/browse/BEAM-1820 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core > Reporter: Eugene Kirpichov > Assignee: Łukasz Gajowy > Labels: easyfix, starter > > Source.getDefaultOutputCoder() returns a coder for elements produced by the > source. > However, the Source objects are nearly always hidden from the user and > instead encapsulated in a transform. Often, an enclosing transform has a > better idea of what coder should be used to encode these elements (e.g. a > user supplied a Coder to that transform's configuration). In that case, it'd > be good if Source.getDefaultOutputCoder() could just return null, and coder > would have to be handled by the enclosing transform or perhaps specified on > the output of that transform explicitly. > Right now there's a bunch of code in the SDK and runners that assumes > Source.getDefaultOutputCoder() returns non-null. That code would need to be > fixed to instead use the coder set on the collection produced by > Read.from(source). > It all appears pretty easy to fix, so this is a good starter item. -- This message was sent by Atlassian JIRA (v6.4.14#64029)