[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1093#issuecomment-138484103 Would be interesting to benchmark the off-heap mode for memory intensive jobs. I would tend to agree with your performance assessment. Since Flink always did do off-heap style techniques (albeit on heap) the difference should not be gigantic. I'd be happy to if you took this over. I could merge it (it breaks nothing) and you could pick up the mini cluster and script configs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1093#discussion_r38902942 --- Diff: flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import java.nio.ByteBuffer; + +/** + * A factory for memory segments. The purpose of this factory is to make sure that all memory segments + * for heap data are of the same type. That way, the runtime does not mix the various specializations + * of the {@link org.apache.flink.core.memory.MemorySegment}. Not mixing them has shown to be beneficial + * to method specialization by the JIT and to overall performance. + * + * Note that this factory auto-initialized to use {@link org.apache.flink.core.memory.HeapMemorySegment}, + * if a request to create a segment comes before the initialization. + */ +public class MemorySegmentFactory { + + /** The factory to use */ + private static Factory factory; --- End diff -- Good question. This is actually initialized once at TaskManager startup. It is probably fine without volatile, but The factory gets a lot of frequent calls, where it would be nice to avoid volatile. Maybe we can make a two-stage initialization, with a `FactoryInitializer` with a volatile field copied into the Factories final field upon initialization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1093#issuecomment-138480430 That's an impressive pull request, @StephanEwen! From a first glance, it looks very well-thought-out. I would be glad to work on some of the remaining issues like the config entries or MiniCluster tests. In addition, I would like to add some nightly performance tests. Is it fair to say that the off-heap memory doesn't add much value for small-sized JVMs? So the default Flink execution mode will use heap memory and one can switch to off-heap memory for setups with very large JVM heap memory size. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1093#issuecomment-138506847 Let's merge this without changing the current state of affairs, i.e. keeping on-heap memory management as the default. Then we should file a JIRA to keep track of the remaining (minor) open issues. - Getting `taskmanager.memory.off-heap`, `taskmanager.memory.size`, and `taskmanager.memory.off-heap-ratio` to work - Integrating with the YarnTaskManagerRunner. - Adding a test case for the FlinkMiniCluster --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1093#issuecomment-138527113 Okay, let me merge this and see if I can address Henry's comment as part of this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1093#issuecomment-138676203 Thanks! Let us keep up the good work :-) @hsaputra I addressed your comment in the commit. Actually made the field volatile, we should be safe that way. I filed two JIRA issues for followup (integration with YARN and startup scripts). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1093 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/1093#discussion_r38966751 --- Diff: flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import java.nio.ByteBuffer; + +/** + * A factory for memory segments. The purpose of this factory is to make sure that all memory segments + * for heap data are of the same type. That way, the runtime does not mix the various specializations + * of the {@link org.apache.flink.core.memory.MemorySegment}. Not mixing them has shown to be beneficial + * to method specialization by the JIT and to overall performance. + * + * Note that this factory auto-initialized to use {@link org.apache.flink.core.memory.HeapMemorySegment}, + * if a request to create a segment comes before the initialization. + */ +public class MemorySegmentFactory { + + /** The factory to use */ + private static Factory factory; --- End diff -- AH, looks like you already addressed it in the actual commit but not reflected in the PR =) Echoing others comment this is an awesome PR, @StephanEwen ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/1093#issuecomment-138676857 @StephanEwen ah yes I saw the update =) Sorry I missed it in the commit check, thanks again for the hard work! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/1093#discussion_r38966473 --- Diff: flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import java.nio.ByteBuffer; + +/** + * A factory for memory segments. The purpose of this factory is to make sure that all memory segments + * for heap data are of the same type. That way, the runtime does not mix the various specializations + * of the {@link org.apache.flink.core.memory.MemorySegment}. Not mixing them has shown to be beneficial + * to method specialization by the JIT and to overall performance. + * + * Note that this factory auto-initialized to use {@link org.apache.flink.core.memory.HeapMemorySegment}, + * if a request to create a segment comes before the initialization. + */ +public class MemorySegmentFactory { + + /** The factory to use */ + private static Factory factory; --- End diff -- @StephanEwen since you already merge this PR, could file JIRA to follow up on this one? Want to make sure we got this right. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/1093#discussion_r38951685 --- Diff: flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import java.nio.ByteBuffer; + +/** + * A factory for memory segments. The purpose of this factory is to make sure that all memory segments + * for heap data are of the same type. That way, the runtime does not mix the various specializations + * of the {@link org.apache.flink.core.memory.MemorySegment}. Not mixing them has shown to be beneficial + * to method specialization by the JIT and to overall performance. + * + * Note that this factory auto-initialized to use {@link org.apache.flink.core.memory.HeapMemorySegment}, + * if a request to create a segment comes before the initialization. + */ +public class MemorySegmentFactory { + + /** The factory to use */ + private static Factory factory; --- End diff -- +1 for that. Want to make sure we do concurrency in the right way =) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/1093#issuecomment-138636557 I agree to keep on-heap memory management as default. Will need to test on YARN deployment to make sure the kinks are resolved when working with it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1093#issuecomment-138237365 Impressive! :-) Thanks for the write up. I think I've learned something about the JVM by reading your PR description. ;) With some minor refactorings in the network stack, we could get rid of a memory copy before putting off-heap segments to the wire. Instead of returning `Buffer` instances we would return `ByteBuf` instances (Netty's recyclable Buffer API), which could then be handed to Netty directly. Currently, we copy from our `Buffer `s to Netty's `ByteBuf`s. I would not expect a huge performance improvement, but I think there are other good reasons to wrap memory in `ByteBuf`s. This is not urgent, but something I wanted to try out for some time now, maybe I'll try something on a free weekend. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1093#issuecomment-138242989 Really impressive work @StephanEwen. Will be hard to review due to its sheer extent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1093#issuecomment-138269258 I think it looks larger than it is. Many small changes dues to changes signatures of the MemoryManager and the MemorySegment instantiation. Also, the vast majority of the code is benchmarks and tests. Interesting to review is mainly: - `MemorySegment` - `HeapMemorySegment` - `HybridMemorySegmentMemorySegment` - The changes in the `TaskManager` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1093#issuecomment-138270482 Since this does not break or alter any existing behavior, I would like to merge it rather soon (as soon as concerns have been cleared). We an polish the off-heap configuration in follow-up pull requests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/1093#discussion_r38890190 --- Diff: flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import java.nio.ByteBuffer; + +/** + * A factory for memory segments. The purpose of this factory is to make sure that all memory segments + * for heap data are of the same type. That way, the runtime does not mix the various specializations + * of the {@link org.apache.flink.core.memory.MemorySegment}. Not mixing them has shown to be beneficial + * to method specialization by the JIT and to overall performance. + * + * Note that this factory auto-initialized to use {@link org.apache.flink.core.memory.HeapMemorySegment}, + * if a request to create a segment comes before the initialization. + */ +public class MemorySegmentFactory { + + /** The factory to use */ + private static Factory factory; --- End diff -- Should we add volatile modifier to make sure all threads looking at the same copy? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/1093 [FLINK-1320] [core] Add an off-heap variant of the managed memory This pull request extends the Flink managed memory to work transparently on-heap and off-heap. In Flink's core were always memory management techniques that operated on serialized data (rather than objects) on pre-reserved memory segments on the heap. This has the huge benefit that the memory usage can be exactly controlled, and that the Garbage Collector was "tamed" by now accumulating billions of long objects lived objects on the heap. One could say that Flink was using off-heap style techniques with on-heap memory. With this pull request, the system can use memory outside the heap, as an alternative to heap memory. The benefit is that this gets the managed memory ozt of the GC's responsibility, and vastly reduces the overall heap size (since often 75% of the memory is used as Flink's managed memory, for sorting, hashing, caching, ...). That way, Flink processes can be scaled to 100s if Gigabytes of main memory. ## Usage The pull request introduces one simple config flag: `taskmanager.memory.off-heap: true` - In heap mode, the TaskManager takes 0.7 of the free heap for the Memory Manager (same as before) - In off-heap mode, the TaskManager takes 3x the size of the heap for the Memory Manager. This implies that the startup scripts should scale the heap size doen to 1/4th, since all big system memory consumers (network, memory manager) now allocate outside the heap. - One can control the off-heap memory size via `taskmanager.memory.size` and `taskmanager.memory.off-heap-ratio`. ## Design decisions 1. Since the operations on Flink's memory segments are in the innermost loops, they are super performance critical. All code is written to be very JIT friendly. 2. Even though the bulk of the memory may be off-heap (if that mode is chosen), there are various points where short-lived unpooled managed heap memory is needed, so n the off-heap case, we need to support mixed memory (on- and off-heap simultaneously). 3. While benchmarking various possible implementations, I found that many operations can be written to transparently work on on-heap and off-heap memory. The only notable exception are individual byte accesses, which are faster on specialized heap implementations (up to 50% faster, see benchmarks below). 4. Because individual byte operations occur a lot for Strings and tags/headers, I did not want to compromise on their performance. We now have the following MemorySegment implementations: - `MemorySegment`: Implements all multi-byte type accesses (ints, longs, doubles) and copy/compare/swap methods to transparently work on heap/off-heap memory. - `HeapMemorySegment`: Optimized single-byte operations, and bulk byte transfers. Used for on-heap case. - `HybridMemorySegment`: Transparent on-heap/off-heap single byte operations and bulk transfers, slightly slower than the HeapMemorySegment. Used for off-heap case, and can also represent the unpooled on-heap memory at the same time. Effectively, performance of some off-heap memory operations are a bit slower than on-heap operations. I think that cannot be avoided, and in this implementation, we have a way to keep peak-performance of managed memory on heap. 5. For optimal JIT efficiency, many methods are marked as final. In addition, performance increases if only one of the MemorySegment specializations is used at a time, and not both are mixed. That way the JIT can optimally de-virtualize and inline methods. Otherwise, it has to switch between optimization/deoptimization or use bimorphic inlining. Since the memory segment accesses are the absolute innermost loops, we care about these last few CPU cycles. The benchmarks below illustrate this. Direct instantiation of the memory segments is not easily possible, but access must go through a `MemorySegmentFactory` that manages which implementation to load and use. 6. The implementations of the individual memory segment functions take care of the following: - Use methods that are JIT intrinsics, i.e., get jitted to optimized native instructions. - Use collapsed checks for range checks and disposal checks. - Avoids branches between heap/off-heap cases, but use uniform code path (for primitive type accesses). - The code paths for valid (non exception) cases are always the first option at branches, for single element operations. - As long as the memory segment object exists, all operations are guaranteed to work and not lead to segmentation faults, if the segment has been concurrently freed. ## Tests More than 6k lines of tests in
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/1093#issuecomment-137857146 Isn't there another PR sent for this issue by Max? Or was it for something else. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1320] [core] Add an off-heap variant of...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1093#issuecomment-137872521 I took the code from #290 (by @mxm and me) as the basis, but heavily reworked it. Some code is similar most notably, the collapsed address/release/bound check in the HybridMemorySegment. Most is different: - The heap and off-heap code share more functions - The off-heap segment is now a hybrid segment that can handle heap and off-heap seamlessly. - Strategy to load only one memory segment implementation (best JIT efficiency) - Improved and fixed bulk operations - Many many many more tests - Many bugs fixed on the way - Added a suite of benchmarks to the tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---