StephanEwen opened a new pull request #8290: [FLINK-12070] [network] Implement 
new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290
 
 
   ## What is the purpose of the change
   
   This PR changes the implementation of the bounded blocking result partitions 
(batch shuffles) to
   make the implementation simpler, more efficient, and most importantly: allow 
for multiple (and concurrent) consumers of that data.
   
   Rather than using a limited set of Flink network buffers and manually 
managing them and spilling them
   to disk, this implementation simply creates a temporary memory mapped file, 
writes the data to that
   memory region, and serves results from there.
   
   This has multiple advantages:
     - Simple implementation that stays in memory as much as possible, but is 
not limited to memory. The memory mapped file asynchronously spills to disk.
     - Result does not occupy Flink network buffers, saving us the complexity 
of reasoning about the buffer resource consumption of the result when keeping 
it around for longer (for batch recovery and interactive queries).
     - Fewer memory copies of the data.
   
   ## Open Architecture Questions
   
   @pnowojski @NicoK @zhijiangW Any of you could probably help here.
   
   The biggest open question in this implementation is: when and how to 
"release" the readers (`ResultSubpartitionView`). To give some background:
   
     - By Java's default, memory mapped files are unmapped during GC, when no 
one references the ByteBuffer any more. This is similar to the way Java deals 
with DirectByteBuffers.
     - This means that unmapping can happen rather late. When one creates many 
memory mapped files and GC does not happen frequently (which is what we try to 
achieve), the memory mapped files linger. This can cause problems as it means 
occupying resources overly long.
     - Netty for example aggressively releases DirectByteBuffers and unmaps 
memory mapped files to prevent that. They have some utilities for that.
     - This implementation follows the same approach, to make sure we release 
resources fast.
     - Concretely, this means that files are unmapped once the 
`BoundedBlockingSubpartition` is released and all readers 
(ResultSubpartitionViews) are released.
   
   Previously, the readers were tightly coupled to the partitions, once the 
partition was disposed, the readers were immediately disposed as well. This 
definitely makes sense for the `PipelinedSubpartition`, because the producer 
and consumer run concurrently. For the `BoundedBlockingSubpartition`, this is 
not strictly required, especially since we don't occupy a strictly limited 
resource (like network buffers).
   
   We could go various routes here:
   
     1. The SubPartitionView (readers) can continue when the ResultPartition is 
released. No new readers can be created, and once all existing readers are 
released, the data file will be unmapped from memory and get deleted. This is 
the version implemented in this PR.
     
        - This option is simple and efficient. The assumption is that by the 
time the consumer (Netty or local channel) calls "releaseAllResources()" it 
won't access buffers any more, so we don't need to track buffer reference 
counts before unmapping the file. That seems to be strictly true throughout all 
existing code.
   
        - We assume that once the consumer tasks are cancelled, the 
SubPartitionView gets released within reasonable time. That is definitely true 
for local input channels, however, I could not verify this in the netty code 
that ships data. Netty might end up shipping large amounts of data, which gets 
discarded on the receiver side. In a clean design, when the receiver closes the 
logical channel, the sender side should notice and release the SubPartionView, 
making this approach work nicely.
     
     ==> This is my favoured design, modulo clarifying the network stack 
behavior as described above.
   
     2. The SubPartitionView (readers) get immediately disposed when the 
ResultSubpartition is disposed. The ResultSubpartition forwards the release() 
call to all readers.
     
        - To prevent Segfaults, we can only mark the reader as disposed, but 
need to defer unmapping until all buffers that reference the memory are 
released. That means keeping a reference counter (increase when slicing a 
buffer out of the memory mapped byte buffer, decrease when that buffer is 
recycled). This is possible, but extra overhead.
        
        - This avoids making the assumption that Netty cancels the logical 
channel when the receiver ends (cancels the partition request).
   
     ==> I would like to avoid this route, to avoid fine grained buffer 
reference tracking.
   
     3. When the `BoundedBlockingSubpartition` is disposed, it does not dispose 
the readers, but sets a flag such that the readers error out upon the next read 
request.
     
        - This seems like a hack to me, caused by unclean designs in other 
components.
   
        - This avoids making the assumption that Netty cancels the logical 
channel when the receiver ends (cancels the partition request).
   
        - Currently, reading from the subpartition must not throw an exception, 
the network stack will close the full TCP channel and not just cancel the one 
logical channel. We would need to change the Netty stack to
           - Either handle exceptions during reading from the subpartition 
views.
           - Or return "null" (which can indicate 'empty' or 'erroneous') and 
then make sure Netty calls "release()" even through "isReleased()" is already 
true (super unclean, should totally avoid this option).
           - Or return "null" (which can indicate 'empty' or 'erroneous') and 
then check a new status "close requested" and let Netty call "release()" in 
that case.
   
     ==> I would like to avoid this option, because it seems hacking around 
symptoms of a problems caused by unclear defined behavior/responsibilities.
   
   ## Further ToDos and Follow-ups
   
   Aside from answering the above question, the main remaining task is to 
benchmark this at large scale.
   
   ## Brief change log
   
     - f239f4a Minor fix to release unpooled event buffers on Netty's sender 
side. Fixes segfault in a special case when using direct buffers and attaching 
a debugger
   
     - 6a3cd72 , e649dde , 1cf4c10 , 3fc1ae7 : Various minor cleanups of tests, 
such as mocking and consolidating utils.
   
     - 9e0a879 , 8dab3fa : Adjusts some assumptions in the shared tests for all 
Subpartition implementations to cover only what want to universally assume for 
all implementations.
     Some tests become thus specific to the `PipelinedSubpartition` 
implementation.
     
     - 5ccd2ad , 5d62657 : Remove old SpillableSubpartition code and move code 
in the base class that was specific to pipelined and previous spillable 
implementation into the `PipelinedSubpartition` class.
   
     - 583c32c : Implementation of new `BoundedBlockingSubpartition`
     
     - 29686c0 : Blocking partitions no longer occupy network buffers after 
writing, and thus tests that check the reaction to `releaseMemory()` calls are 
no longer applicable.
     
     - 169e097 : Tests for the new implementation of 
`BoundedBlockingSubpartition`
   
   ## Verifying this change
   
   The code can be tested with any batch job that does blocking data exchanges.
   To force batch data exchanges, add 
`env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);`
   to any DataSet program. Otherwise many data exchanges will currently still 
be pipelined.
   
   To test this in-IDE, take the *WordCount* example and add the above line.
   To test the network code, use a local environment and pass a configuration 
that starts multiple local TaskManagers.
   
   This adds also several untit tests:
     - `BoundedBlockingSubpartitionTest` for common behavior tests around 
releasing and buffer disposal.
     - `BoundedBlockingSubpartitionWriteReadTest` for producing data into a 
partition and consuming it. This test also tests that partitions can be 
consumed multiple times and can be read by multiple concurrent readers.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **yes, batch 
data exchange**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes, batch intermediate 
results used during recovery**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to