Jingsong Lee created FLINK-11974:
------------------------------------

             Summary: Introduce StreamOperatorSubstitutor to help table perform 
the whole Operator CodeGen
                 Key: FLINK-11974
                 URL: https://issues.apache.org/jira/browse/FLINK-11974
             Project: Flink
          Issue Type: New Feature
          Components: Runtime / Operators
            Reporter: Jingsong Lee
            Assignee: Jingsong Lee


If we need CodeGen an entire Operator, one possible solution is to introduce an 
OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's 
open, and then proxy all methods to the sub-Operator.

Doing so results in multiple virtual function calls, so we introduce a 
StreamOperatorSubstitutor:
{code:java}
/**
 * Basic interface for stream operator substitutes. It is transferred to the 
streamTask by
 * serialization, and produce an actual stream operator to the streamTask, who 
uses the actual
 * stream operator to run.
 *
 * @param <OUT> output type of the actual stream operator
 */
public interface StreamOperatorSubstitutor<OUT> {

   /**
    * Produces the actual stream operator.
    *
    * @param userCodeClassLoader the user code class loader to use.
    * @return the actual stream operator created on {@code StreamTask}.
    */
   StreamOperator<OUT> getActualStreamOperator(ClassLoader userCodeClassLoader);
}
{code}
In StreamConfig.getStreamOperator, we need:
{code:java}
if (operator != null && operator instanceof StreamOperatorSubstitutor) {
   return (T) ((StreamOperatorSubstitutor) 
operator).getActualStreamOperator(cl);
} else {
   return (T) operator;
}
{code}
to get the real operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to