[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925923#comment-15925923
 ] 

ASF GitHub Bot commented on FLINK-4460:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3484#discussion_r106139311
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
    @@ -326,33 +327,46 @@ public int getChainLength() {
                        Map<Integer, StreamConfig> chainedConfigs,
                        ClassLoader userCodeClassloader,
                        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
    -                   List<StreamOperator<?>> allOperators)
    +                   List<StreamOperator<?>> allOperators,
    +                   OutputTag<IN> outputTag)
        {
                // create the output that the operator writes to first. this 
may recursively create more operators
                Output<StreamRecord<OUT>> output = createOutputCollector(
                                containingTask, operatorConfig, chainedConfigs, 
userCodeClassloader, streamOutputs, allOperators);
     
                // now create the operator and give it the output collector to 
write its output to
                OneInputStreamOperator<IN, OUT> chainedOperator = 
operatorConfig.getStreamOperator(userCodeClassloader);
    +
                chainedOperator.setup(containingTask, operatorConfig, output);
     
                allOperators.add(chainedOperator);
     
                if (containingTask.getExecutionConfig().isObjectReuseEnabled()) 
{
    -                   return new ChainingOutput<>(chainedOperator, this);
    +                   return new ChainingOutput<>(chainedOperator, this, 
outputTag);
                }
                else {
                        TypeSerializer<IN> inSerializer = 
operatorConfig.getTypeSerializerIn1(userCodeClassloader);
    -                   return new CopyingChainingOutput<>(chainedOperator, 
inSerializer, this);
    +                   return new CopyingChainingOutput<>(chainedOperator, 
inSerializer, outputTag, this);
                }
        }
        
        private <T> RecordWriterOutput<T> createStreamOutput(
                        StreamEdge edge, StreamConfig upStreamConfig, int 
outputIndex,
                        Environment taskEnvironment,
    -                   String taskName)
    -   {
    -           TypeSerializer<T> outSerializer = 
upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
    +                   String taskName) {
    +           OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, 
return null if not sideOutput
    +
    +           TypeSerializer outSerializer = null;
    +
    +           if (edge.getOutputTag() != null) {
    +                   // side output
    +                   outSerializer = upStreamConfig.getTypeSerializerSideOut(
    +                                   edge.getOutputTag(), 
taskEnvironment.getUserClassLoader());
    +           } else {
    +                   // main output
    --- End diff --
    
    this can become one line.


> Side Outputs in Flink
> ---------------------
>
>                 Key: FLINK-4460
>                 URL: https://issues.apache.org/jira/browse/FLINK-4460
>             Project: Flink
>          Issue Type: New Feature
>          Components: Core, DataStream API
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Chen Qin
>            Assignee: Chen Qin
>              Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to