Jingsong Lee created FLINK-11974: ------------------------------------ Summary: Introduce StreamOperatorSubstitutor to help table perform the whole Operator CodeGen Key: FLINK-11974 URL: https://issues.apache.org/jira/browse/FLINK-11974 Project: Flink Issue Type: New Feature Components: Runtime / Operators Reporter: Jingsong Lee Assignee: Jingsong Lee
If we need CodeGen an entire Operator, one possible solution is to introduce an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's open, and then proxy all methods to the sub-Operator. Doing so results in multiple virtual function calls, so we introduce a StreamOperatorSubstitutor: {code:java} /** * Basic interface for stream operator substitutes. It is transferred to the streamTask by * serialization, and produce an actual stream operator to the streamTask, who uses the actual * stream operator to run. * * @param <OUT> output type of the actual stream operator */ public interface StreamOperatorSubstitutor<OUT> { /** * Produces the actual stream operator. * * @param userCodeClassLoader the user code class loader to use. * @return the actual stream operator created on {@code StreamTask}. */ StreamOperator<OUT> getActualStreamOperator(ClassLoader userCodeClassLoader); } {code} In StreamConfig.getStreamOperator, we need: {code:java} if (operator != null && operator instanceof StreamOperatorSubstitutor) { return (T) ((StreamOperatorSubstitutor) operator).getActualStreamOperator(cl); } else { return (T) operator; } {code} to get the real operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)