StephanEwen opened a new pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions URL: https://github.com/apache/flink/pull/8290 ## What is the purpose of the change This PR changes the implementation of the bounded blocking result partitions (batch shuffles) to make the implementation simpler, more efficient, and most importantly: allow for multiple (and concurrent) consumers of that data. Rather than using a limited set of Flink network buffers and manually managing them and spilling them to disk, this implementation simply creates a temporary memory mapped file, writes the data to that memory region, and serves results from there. This has multiple advantages: - Simple implementation that stays in memory as much as possible, but is not limited to memory. The memory mapped file asynchronously spills to disk. - Result does not occupy Flink network buffers, saving us the complexity of reasoning about the buffer resource consumption of the result when keeping it around for longer (for batch recovery and interactive queries). - Fewer memory copies of the data. ## Open Architecture Questions @pnowojski @NicoK @zhijiangW Any of you could probably help here. The biggest open question in this implementation is: when and how to "release" the readers (`ResultSubpartitionView`). To give some background: - By Java's default, memory mapped files are unmapped during GC, when no one references the ByteBuffer any more. This is similar to the way Java deals with DirectByteBuffers. - This means that unmapping can happen rather late. When one creates many memory mapped files and GC does not happen frequently (which is what we try to achieve), the memory mapped files linger. This can cause problems as it means occupying resources overly long. - Netty for example aggressively releases DirectByteBuffers and unmaps memory mapped files to prevent that. They have some utilities for that. - This implementation follows the same approach, to make sure we release resources fast. - Concretely, this means that files are unmapped once the `BoundedBlockingSubpartition` is released and all readers (ResultSubpartitionViews) are released. Previously, the readers were tightly coupled to the partitions, once the partition was disposed, the readers were immediately disposed as well. This definitely makes sense for the `PipelinedSubpartition`, because the producer and consumer run concurrently. For the `BoundedBlockingSubpartition`, this is not strictly required, especially since we don't occupy a strictly limited resource (like network buffers). We could go various routes here: 1. The SubPartitionView (readers) can continue when the ResultPartition is released. No new readers can be created, and once all existing readers are released, the data file will be unmapped from memory and get deleted. This is the version implemented in this PR. - This option is simple and efficient. The assumption is that by the time the consumer (Netty or local channel) calls "releaseAllResources()" it won't access buffers any more, so we don't need to track buffer reference counts before unmapping the file. That seems to be strictly true throughout all existing code. - We assume that once the consumer tasks are cancelled, the SubPartitionView gets released within reasonable time. That is definitely true for local input channels, however, I could not verify this in the netty code that ships data. Netty might end up shipping large amounts of data, which gets discarded on the receiver side. In a clean design, when the receiver closes the logical channel, the sender side should notice and release the SubPartionView, making this approach work nicely. ==> This is my favoured design, modulo clarifying the network stack behavior as described above. 2. The SubPartitionView (readers) get immediately disposed when the ResultSubpartition is disposed. The ResultSubpartition forwards the release() call to all readers. - To prevent Segfaults, we can only mark the reader as disposed, but need to defer unmapping until all buffers that reference the memory are released. That means keeping a reference counter (increase when slicing a buffer out of the memory mapped byte buffer, decrease when that buffer is recycled). This is possible, but extra overhead. - This avoids making the assumption that Netty cancels the logical channel when the receiver ends (cancels the partition request). ==> I would like to avoid this route, to avoid fine grained buffer reference tracking. 3. When the `BoundedBlockingSubpartition` is disposed, it does not dispose the readers, but sets a flag such that the readers error out upon the next read request. - This seems like a hack to me, caused by unclean designs in other components. - This avoids making the assumption that Netty cancels the logical channel when the receiver ends (cancels the partition request). - Currently, reading from the subpartition must not throw an exception, the network stack will close the full TCP channel and not just cancel the one logical channel. We would need to change the Netty stack to - Either handle exceptions during reading from the subpartition views. - Or return "null" (which can indicate 'empty' or 'erroneous') and then make sure Netty calls "release()" even through "isReleased()" is already true (super unclean, should totally avoid this option). - Or return "null" (which can indicate 'empty' or 'erroneous') and then check a new status "close requested" and let Netty call "release()" in that case. ==> I would like to avoid this option, because it seems hacking around symptoms of a problems caused by unclear defined behavior/responsibilities. ## Further ToDos and Follow-ups Aside from answering the above question, the main remaining task is to benchmark this at large scale. ## Brief change log - f239f4a Minor fix to release unpooled event buffers on Netty's sender side. Fixes segfault in a special case when using direct buffers and attaching a debugger - 6a3cd72 , e649dde , 1cf4c10 , 3fc1ae7 : Various minor cleanups of tests, such as mocking and consolidating utils. - 9e0a879 , 8dab3fa : Adjusts some assumptions in the shared tests for all Subpartition implementations to cover only what want to universally assume for all implementations. Some tests become thus specific to the `PipelinedSubpartition` implementation. - 5ccd2ad , 5d62657 : Remove old SpillableSubpartition code and move code in the base class that was specific to pipelined and previous spillable implementation into the `PipelinedSubpartition` class. - 583c32c : Implementation of new `BoundedBlockingSubpartition` - 29686c0 : Blocking partitions no longer occupy network buffers after writing, and thus tests that check the reaction to `releaseMemory()` calls are no longer applicable. - 169e097 : Tests for the new implementation of `BoundedBlockingSubpartition` ## Verifying this change The code can be tested with any batch job that does blocking data exchanges. To force batch data exchanges, add `env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);` to any DataSet program. Otherwise many data exchanges will currently still be pipelined. To test this in-IDE, take the *WordCount* example and add the above line. To test the network code, use a local environment and pass a configuration that starts multiple local TaskManagers. This adds also several untit tests: - `BoundedBlockingSubpartitionTest` for common behavior tests around releasing and buffer disposal. - `BoundedBlockingSubpartitionWriteReadTest` for producing data into a partition and consuming it. This test also tests that partitions can be consumed multiple times and can be read by multiple concurrent readers. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **yes, batch data exchange** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes, batch intermediate results used during recovery** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable**
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services