[FLINK-6246] Fix generic type of OutputTag in operator Output

This closes #3662.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9bdbe607
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9bdbe607
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9bdbe607

Branch: refs/heads/table-retraction
Commit: 9bdbe6071f1946391598709bfa637fd76a8c7396
Parents: 48ce77c
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Apr 3 16:10:14 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Wed Apr 5 20:43:00 2017 +0200

----------------------------------------------------------------------
 .../api/collector/selector/DirectedOutput.java     |  2 +-
 .../api/operators/AbstractStreamOperator.java      |  2 +-
 .../flink/streaming/api/operators/Output.java      |  2 +-
 .../streaming/runtime/io/RecordWriterOutput.java   |  2 +-
 .../streaming/runtime/tasks/OperatorChain.java     | 17 ++++++-----------
 .../runtime/tasks/StreamIterationTail.java         |  2 +-
 .../util/AbstractStreamOperatorTestHarness.java    |  2 +-
 .../flink/streaming/util/CollectorOutput.java      |  2 +-
 .../apache/flink/streaming/util/MockOutput.java    |  2 +-
 9 files changed, 14 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
index dabe804..6339506 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
@@ -140,7 +140,7 @@ public class DirectedOutput<OUT> implements 
Output<StreamRecord<OUT>> {
        }
 
        @Override
-       public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) 
{
+       public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) 
{
                throw new UnsupportedOperationException("Cannot use 
split/select with side outputs.");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index cc81a0e..7569170 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -872,7 +872,7 @@ public abstract class AbstractStreamOperator<OUT>
                }
 
                @Override
-               public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> 
record) {
+               public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
record) {
                        numRecordsOut.inc();
                        output.collect(outputTag, record);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
index eb10d8d..7035d28 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
@@ -48,7 +48,7 @@ public interface Output<T> extends Collector<T> {
         *
         * @param record The record to collect.
         */
-       <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record);
+       <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record);
 
        void emitLatencyMarker(LatencyMarker latencyMarker);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index d22c60d..365c78e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -85,7 +85,7 @@ public class RecordWriterOutput<OUT> implements 
Output<StreamRecord<OUT>> {
        }
 
        @Override
-       public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) 
{
+       public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) 
{
                if (this.outputTag == null || 
!this.outputTag.equals(outputTag)) {
                        // we are only responsible for emitting to the 
side-output specified by our
                        // OutputTag.

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 499d4a3..be4b456 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -423,7 +423,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                }
 
                @Override
-               public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> 
record) {
+               public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
record) {
                        if (this.outputTag == null || 
!this.outputTag.equals(outputTag)) {
                                // we are only responsible for emitting to the 
side-output specified by our
                                // OutputTag.
@@ -506,7 +506,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                }
 
                @Override
-               public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> 
record) {
+               public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
record) {
                        if (this.outputTag == null || 
!this.outputTag.equals(outputTag)) {
                                // we are only responsible for emitting to the 
side-output specified by our
                                // OutputTag.
@@ -579,7 +579,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                }
 
                @Override
-               public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> 
record) {
+               public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
record) {
                        for (Output<StreamRecord<T>> output : outputs) {
                                output.collect(outputTag, record);
                        }
@@ -619,21 +619,16 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                }
 
                @Override
-               public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> 
record) {
+               public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
record) {
                        for (int i = 0; i < outputs.length - 1; i++) {
                                Output<StreamRecord<T>> output = outputs[i];
 
-                               // due to side outputs, StreamRecords of 
varying types can pass through the broadcasting
-                               // collector so we need to cast
-                               @SuppressWarnings({"unchecked", "rawtypes"})
-                               StreamRecord<T> shallowCopy = (StreamRecord<T>) 
record.copy(record.getValue());
+                               StreamRecord<X> shallowCopy = 
record.copy(record.getValue());
                                output.collect(outputTag, shallowCopy);
                        }
 
                        // don't copy for the last output
-                       @SuppressWarnings({"unchecked", "rawtypes"})
-                       StreamRecord<T> castRecord = (StreamRecord<T>) record;
-                       outputs[outputs.length - 1].collect(outputTag, 
castRecord);
+                       outputs[outputs.length - 1].collect(outputTag, record);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index 35d14e7..e40f834 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -123,7 +123,7 @@ public class StreamIterationTail<IN> extends 
OneInputStreamTask<IN, IN> {
                }
 
                @Override
-               public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> 
record) {
+               public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
record) {
                        throw new UnsupportedOperationException("Side outputs 
not used in iteration tail");
 
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 912d579..a2a4f79 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -656,7 +656,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                }
 
                @Override
-               public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> 
record) {
+               public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
record) {
                        sideOutputSerializer = 
TypeExtractor.getForObject(record.getValue()).createSerializer(executionConfig);
 
                        ConcurrentLinkedQueue<Object> sideOutputList = 
sideOutputLists.get(outputTag);

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
index 07b37c8..bd929da 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
@@ -54,7 +54,7 @@ public class CollectorOutput<T> implements 
Output<StreamRecord<T>> {
        }
 
        @Override
-       public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) 
{
+       public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) 
{
                throw new UnsupportedOperationException("Side output not 
supported for CollectorOutput");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bdbe607/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
index 867080c..8c3226b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
@@ -41,7 +41,7 @@ public class MockOutput<T> implements Output<StreamRecord<T>> 
{
        }
 
        @Override
-       public <X> void collect(OutputTag<?> outputTag, StreamRecord<X> record) 
{
+       public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) 
{
                throw new UnsupportedOperationException("Side output not 
supported for MockOutput");
        }
 

Reply via email to