mxm commented on a change in pull request #13240:
URL: https://github.com/apache/beam/pull/13240#discussion_r516185424
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1194,29 +1200,35 @@ public void verifyDeterministic() throws
NonDeterministicException {
private Map<TupleTag<?>, Integer> tagsToIds;
private Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
private Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders;
+ private SerializablePipelineOptions pipelineOptions;
Review comment:
```suggestion
private final SerializablePipelineOptions pipelineOptions;
```
##########
File path:
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
##########
@@ -761,7 +797,9 @@ void testSideInputs(boolean keyed) throws Exception {
doFnOperator,
keySelector,
null,
- new CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
+ new CoderTypeInformation<>(
+ FlinkKeyUtils.ByteBufferCoder.of(),
+ PipelineOptionsFactory.as(FlinkPipelineOptions.class)));
Review comment:
Perhaps, create a static factory method for all of these defaults? E.g.
`FlinkPipelineOptions.default()`
##########
File path:
runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
##########
@@ -82,10 +81,14 @@ public T createInstance() {
@Override
public T copy(T t) {
Review comment:
This is a Flink interface method which we can't rename.
##########
File path:
runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
##########
@@ -49,20 +50,18 @@
* org.apache.beam.sdk.io.FileSystems} registration needed for {@link
* org.apache.beam.sdk.transforms.Reshuffle} translation.
*/
- @SuppressWarnings("unused")
- private final @Nullable SerializablePipelineOptions pipelineOptions;
+ private final SerializablePipelineOptions pipelineOptions;
- public CoderTypeSerializer(Coder<T> coder) {
- Preconditions.checkNotNull(coder);
- this.coder = coder;
- this.pipelineOptions = null;
- }
+ private final boolean fasterCopy;
Review comment:
Folks, this is an internal boolean flag which is only used at a single
place. If in doubt, one can clearly see what it does. We could add a `/** */`
Javadoc field comment for it if you will. IMHO the name is really personal
preference and I have nothing against `fasterCopy` as it clearly indicates that
the `copy()` method will return faster. Nothing semantically wrong about that.
`zeroCopy` is not 100% true because we will perform a copy for primitive types
but please let's not argue about that. `DisableExcessCopy` is too opinionated
IMHO 😇
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]