[ 
https://issues.apache.org/jira/browse/BEAM-1820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078407#comment-16078407
 ] 

Łukasz Gajowy commented on BEAM-1820:
-------------------------------------

Below there's a list of PTransforms that assume Source.getDefaultOutputCoder() 
is not nullable:

- UnboundedReadFromBoundedSource
- StreamingBoundedRead
- StreamingUnboundedRead
- BoundedReadFromUnboundedSource
- Read.Bounded 
- Read.Unbounded
- StreamingUnboundedRead/ReadWithIds (DataflowRunner)

Coder in first four PTransforms can be obtained during the expand() method call 
from a Read.from() performed there. When I modify it, this way, all the test 
pass. Below there's an example on how i do this (based on 
`UnboundedReadFromBoundedSource`)
```
private Coder<T> outputCoder;
  
  @Override
  public PCollection<T> expand(PBegin input) {
    PCollection<T> collection = input.getPipeline().apply(
        Read.from(new BoundedToUnboundedSourceAdapter<>(source)));

    outputCoder = collection.getCoder();

    return collection;
  }

  @Override
  protected Coder<T> getDefaultOutputCoder() {
    return outputCoder;
  }

```

The other three PTransforms fail the tests when i try to make a similar change. 
This is due to the fact that the coder is not being set in the PCollection. 

Therefore I have the following questions that I would be glad if someone more 
experienced than me answers: 

1. Is the way of obtaining the coder correct in the first four cases (this is 
how i understood the task)? Is the fact that the outputCoder variable remains 
null until the expand() method executes ok?
2. What about PTransforms in which the Read.from() in the expand() method does 
not set the Coder on PCollection? This is the reason tests fail in the last 
three cases i listed.
3. Also, there are places in Source classes (e.g. MicrobatchSource) that assume 
not null default output coder, but I think that correcting them is not a part 
of the task because it's the transform code that should worry about the Coders 
regarding to the issue description. Do I understand correctly?


> Source.getDefaultOutputCoder() should be @Nullable
> --------------------------------------------------
>
>                 Key: BEAM-1820
>                 URL: https://issues.apache.org/jira/browse/BEAM-1820
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Eugene Kirpichov
>            Assignee: Łukasz Gajowy
>              Labels: easyfix, starter
>
> Source.getDefaultOutputCoder() returns a coder for elements produced by the 
> source.
> However, the Source objects are nearly always hidden from the user and 
> instead encapsulated in a transform. Often, an enclosing transform has a 
> better idea of what coder should be used to encode these elements (e.g. a 
> user supplied a Coder to that transform's configuration). In that case, it'd 
> be good if Source.getDefaultOutputCoder() could just return null, and coder 
> would have to be handled by the enclosing transform or perhaps specified on 
> the output of that transform explicitly.
> Right now there's a bunch of code in the SDK and runners that assumes 
> Source.getDefaultOutputCoder() returns non-null. That code would need to be 
> fixed to instead use the coder set on the collection produced by 
> Read.from(source).
> It all appears pretty easy to fix, so this is a good starter item.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to