[FLINK-6246] Fix generic type of OutputTag in operator Output This closes #3662.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9bdbe607 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9bdbe607 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9bdbe607 Branch: refs/heads/table-retraction Commit: 9bdbe6071f1946391598709bfa637fd76a8c7396 Parents: 48ce77c Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Mon Apr 3 16:10:14 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Wed Apr 5 20:43:00 2017 +0200 ---------------------------------------------------------------------- .../api/collector/selector/DirectedOutput.java | 2 +- .../api/operators/AbstractStreamOperator.java | 2 +- .../flink/streaming/api/operators/Output.java | 2 +- .../streaming/runtime/io/RecordWriterOutput.java | 2 +- .../streaming/runtime/tasks/OperatorChain.java | 17 ++++++----------- .../runtime/tasks/StreamIterationTail.java | 2 +- .../util/AbstractStreamOperatorTestHarness.java | 2 +- .../flink/streaming/util/CollectorOutput.java | 2 +- .../apache/flink/streaming/util/MockOutput.java | 2 +- 9 files changed, 14 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java index dabe804..6339506 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java @@ -140,7 +140,7 @@ public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> { } @Override - public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { throw new UnsupportedOperationException("Cannot use split/select with side outputs."); } http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index cc81a0e..7569170 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -872,7 +872,7 @@ public abstract class AbstractStreamOperator<OUT> } @Override - public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { numRecordsOut.inc(); output.collect(outputTag, record); } http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java index eb10d8d..7035d28 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java @@ -48,7 +48,7 @@ public interface Output<T> extends Collector<T> { * * @param record The record to collect. */ - <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record); + <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record); void emitLatencyMarker(LatencyMarker latencyMarker); } http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index d22c60d..365c78e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -85,7 +85,7 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> { } @Override - public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { if (this.outputTag == null || !this.outputTag.equals(outputTag)) { // we are only responsible for emitting to the side-output specified by our // OutputTag. http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 499d4a3..be4b456 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -423,7 +423,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea } @Override - public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { if (this.outputTag == null || !this.outputTag.equals(outputTag)) { // we are only responsible for emitting to the side-output specified by our // OutputTag. @@ -506,7 +506,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea } @Override - public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { if (this.outputTag == null || !this.outputTag.equals(outputTag)) { // we are only responsible for emitting to the side-output specified by our // OutputTag. @@ -579,7 +579,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea } @Override - public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { for (Output<StreamRecord<T>> output : outputs) { output.collect(outputTag, record); } @@ -619,21 +619,16 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea } @Override - public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { for (int i = 0; i < outputs.length - 1; i++) { Output<StreamRecord<T>> output = outputs[i]; - // due to side outputs, StreamRecords of varying types can pass through the broadcasting - // collector so we need to cast - @SuppressWarnings({"unchecked", "rawtypes"}) - StreamRecord<T> shallowCopy = (StreamRecord<T>) record.copy(record.getValue()); + StreamRecord<X> shallowCopy = record.copy(record.getValue()); output.collect(outputTag, shallowCopy); } // don't copy for the last output - @SuppressWarnings({"unchecked", "rawtypes"}) - StreamRecord<T> castRecord = (StreamRecord<T>) record; - outputs[outputs.length - 1].collect(outputTag, castRecord); + outputs[outputs.length - 1].collect(outputTag, record); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java index 35d14e7..e40f834 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java @@ -123,7 +123,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> { } @Override - public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { throw new UnsupportedOperationException("Side outputs not used in iteration tail"); } http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 912d579..a2a4f79 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -656,7 +656,7 @@ public class AbstractStreamOperatorTestHarness<OUT> { } @Override - public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { sideOutputSerializer = TypeExtractor.getForObject(record.getValue()).createSerializer(executionConfig); ConcurrentLinkedQueue<Object> sideOutputList = sideOutputLists.get(outputTag); http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java index 07b37c8..bd929da 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java @@ -54,7 +54,7 @@ public class CollectorOutput<T> implements Output<StreamRecord<T>> { } @Override - public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { throw new UnsupportedOperationException("Side output not supported for CollectorOutput"); } http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java index 867080c..8c3226b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java @@ -41,7 +41,7 @@ public class MockOutput<T> implements Output<StreamRecord<T>> { } @Override - public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) { + public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { throw new UnsupportedOperationException("Side output not supported for MockOutput"); }