This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 81cab8f4869 [FLINK-32213][core] Add get off heap buffer in segment 81cab8f4869 is described below commit 81cab8f486997ef666128cce4903c24d44ac7534 Author: Shammon FY <zjur...@gmail.com> AuthorDate: Tue Jun 6 10:18:14 2023 +0800 [FLINK-32213][core] Add get off heap buffer in segment This closes #22675 --- .../org/apache/flink/core/memory/MemorySegment.java | 14 ++++++++++++++ .../runtime/memory/MemorySegmentSimpleTest.java | 20 ++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java index ce39a122654..93e81fc5be8 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java @@ -279,6 +279,20 @@ public final class MemorySegment { } } + /** + * Returns the off-heap buffer of memory segments. + * + * @return underlying off-heap buffer + * @throws IllegalStateException if the memory segment does not represent off-heap buffer + */ + public ByteBuffer getOffHeapBuffer() { + if (offHeapBuffer != null) { + return offHeapBuffer; + } else { + throw new IllegalStateException("Memory segment does not represent off-heap buffer"); + } + } + /** * Returns the memory address of off-heap memory segments. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java index 8301a355862..d2a8da32f1c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java @@ -34,6 +34,8 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; /** Test reading and writing primitive types to {@link MemorySegment}. */ public class MemorySegmentSimpleTest { @@ -574,4 +576,22 @@ public class MemorySegmentSimpleTest { Assert.fail(e.getMessage()); } } + + @Test + public void testGetOffHeapBuffer() { + MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(1024); + assertThrows( + IllegalStateException.class, + seg::getOffHeapBuffer, + "Memory segment does not represent off-heap buffer"); + seg.free(); + + seg = MemorySegmentFactory.allocateUnpooledOffHeapMemory(1024); + assertNotNull(seg.getOffHeapBuffer()); + seg.free(); + + seg = MemorySegmentFactory.allocateOffHeapUnsafeMemory(1024); + assertNotNull(seg.getOffHeapBuffer()); + seg.free(); + } }