StephanEwen opened a new pull request #12155: URL: https://github.com/apache/flink/pull/12155
## What is the purpose of the change This PR contains various small fixes and improvements for the `SourceOperator` implementation. Nothing is any change in user-facing behavior, these are only improvements for better future maintenance. ## Brief change log ### (1) Simplify State Access in SourceOperator The SourceOperator has some boiler plate code taking the bytes out of the `ListState<byte[]>` and applying the `SimpleVersionedSerializer` to turn them into the splits. This change encapsulates that code in a utility class `SimpleVersionedListState<SplitT>` which wraps a `ListState<byte[]>` and applies the serialization and de-serialization. ### (2) Initialization of the SourceOperator Before this change, the `SourceOperator` takes a `Source` in the constructor. All actual components that the `SourceOperator` relies on when working are lazily initialized, in `open()` or via setters. This change moves towards more eager initialization, as is the purpose of the new `SourceOperatorFactory`-based appraoch. Relying on something as broad as Source also means that a lot of redundant context has to be provided to the `SourceOperator` during initialization. The Source is, for example, also responsible for the `SourceEnumerator`, which is independent of the `SourceOperator`. However, it needed to be considered during testing, because the tests need to mock a full `Source` in order to instantiate a SourceOperator. This change passes the collaborators of the `SourceOperator` directly eagerly into the constructor. It is not fully possible with the `SourceReader`, but for that we can still reduce the scope by passing a targeted factory function. ## Verifying this change This PR does not change any behavior, only internal design. The functionality is already covered by existing unit tests. The PR adjusts the relevant teste where needed. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org