StephanEwen opened a new pull request #12155:
URL: https://github.com/apache/flink/pull/12155


   ## What is the purpose of the change
   
   This PR contains various small fixes and improvements for the 
`SourceOperator` implementation.
   Nothing is any change in user-facing behavior, these are only improvements 
for better future maintenance.
   
   ## Brief change log
   
   ### (1) Simplify State Access in SourceOperator
   
   The SourceOperator has some boiler plate code taking the bytes out of the 
`ListState<byte[]>` and applying the `SimpleVersionedSerializer` to turn them 
into the splits.
   
   This change encapsulates that code in a utility class 
`SimpleVersionedListState<SplitT>` which wraps a `ListState<byte[]>` and 
applies the serialization and de-serialization.
   
   ### (2) Initialization of the SourceOperator
   
   Before this change, the `SourceOperator` takes a `Source` in the constructor.
   All actual components that the `SourceOperator` relies on when working are 
lazily initialized, in `open()` or via setters. This change moves towards more 
eager initialization, as is the purpose of the new 
`SourceOperatorFactory`-based appraoch.
   
   Relying on something as broad as Source also means that a lot of redundant 
context has to be provided to the `SourceOperator` during initialization. The 
Source is, for example, also responsible for the `SourceEnumerator`, which is 
independent of the `SourceOperator`. However, it needed to be considered during 
testing, because the tests need to mock a full `Source` in order to instantiate 
a SourceOperator.
   
   This change passes the collaborators of the `SourceOperator` directly 
eagerly into the constructor. It is not fully possible with the `SourceReader`, 
but for that we can still reduce the scope by passing a targeted factory 
function.
   
   ## Verifying this change
   
   This PR does not change any behavior, only internal design. The 
functionality is already covered by existing unit tests. The PR adjusts the 
relevant teste where needed.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to