godfreyhe opened a new pull request #13839: URL: https://github.com/apache/flink/pull/13839
## What is the purpose of the change *Multiple input operator for batch is implemented in [FLINK-19627](https://issues.apache.org/jira/browse/FLINK-19627). This pr aims to introduce multiple input operator for streaming. Different from batch multiple input operator, the stream multiple input operator should handle key selector, state/checkpoint/barrier operations. If each sub-operator in a multiple input operator has a state handler, it involves many change in runtime. So all sub-operators share a same state handler. there are three issue we need to resolve: 1. stateHandler in AbstractStreamOperatorV2 is private, we need to change it to protected so that StreamMultipleInputStreamOperator can access it* 2. stateHandler in AbstractStreamOperator is created when calling initializeState method. We need to add a new method and pass the StreamMultipleInputStreamOperator's stateHandler to AbstractStreamOperator so that the sub-operators could share same stateHandler instance. 3. StreamMultipleInputStreamOperator may contain operators with same type (e.g. two aggregate operators), they have same state name. This can lead to the wrong state values. We introduce StateNameAware and StateNameContext to make sure all state names in a multiple input operator are unique. ## Brief change log - *Rename BatchExecMultipleInputNode/StreamExecMultipleInputNode to BatchExecMultipleInput/StreamExecMultipleInput* - *Stream planner support multiple input node* - *Introduce AbstractStreamOperator#initializeState(StreamOperatorStateHandler, InternalTimeServiceManager<?>) method so that AbstractStreamOperator can use existing state handler* - *Change access modifier priority of `stateHandler` and `timeServiceManager` from `private` to `protected` in AbstractStreamOperatorV2 so that the sub-class of AbstractStreamOperatorV2 can access stateHandler* - *Introduce multiple input operator for streaming* ## Verifying this change This change added tests and can be verified as follows: - *The first, third and fourth commit is a trivial rework and is already covered by existing tests* - *The second commit is verified by TableOperatorWrapperGeneratorTest, TableOperatorWrapperTest and ExplainTest* - *The last commit is verified by InputTest, OutputTest, StreamMultipleInputStreamOperatorTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
