godfreyhe opened a new pull request #13839:
URL: https://github.com/apache/flink/pull/13839


   
   
   ## What is the purpose of the change
   
   *Multiple input operator for batch is implemented in 
[FLINK-19627](https://issues.apache.org/jira/browse/FLINK-19627). This pr aims 
to introduce multiple input operator for streaming. Different from batch 
multiple input operator, the stream multiple input operator should handle key 
selector, state/checkpoint/barrier operations. If each sub-operator in a 
multiple input operator has a state handler, it involves many change in 
runtime. So all sub-operators share a same state handler. there are three issue 
we need to resolve:
   1. stateHandler in AbstractStreamOperatorV2 is private, we need to change it 
to protected so that StreamMultipleInputStreamOperator can access it*
   2. stateHandler in AbstractStreamOperator is created when calling 
initializeState method. We need to add a new method and pass the 
StreamMultipleInputStreamOperator's stateHandler to AbstractStreamOperator so 
that the sub-operators could share same stateHandler instance.
   3. StreamMultipleInputStreamOperator may contain operators with same type 
(e.g. two aggregate operators), they have same state name. This can lead to the 
wrong state values. We introduce StateNameAware and StateNameContext to make 
sure all state names in a multiple input operator are unique.
   
   
   ## Brief change log
   
     - *Rename BatchExecMultipleInputNode/StreamExecMultipleInputNode to 
BatchExecMultipleInput/StreamExecMultipleInput*
     - *Stream planner support multiple input node*
     - *Introduce 
AbstractStreamOperator#initializeState(StreamOperatorStateHandler, 
InternalTimeServiceManager<?>) method so that AbstractStreamOperator can use 
existing state handler*
     - *Change access modifier priority of `stateHandler` and 
`timeServiceManager` from `private` to `protected` in AbstractStreamOperatorV2 
so that the sub-class of AbstractStreamOperatorV2 can access stateHandler*
     - *Introduce multiple input operator for streaming*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *The first, third and fourth commit is a trivial rework and is already 
covered by existing tests*
     - *The second commit is verified by TableOperatorWrapperGeneratorTest, 
TableOperatorWrapperTest and ExplainTest*
     - *The last commit is verified by InputTest, OutputTest, 
StreamMultipleInputStreamOperatorTest*
    
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / 
don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to