This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 81cab8f4869 [FLINK-32213][core] Add get off heap buffer in segment
81cab8f4869 is described below

commit 81cab8f486997ef666128cce4903c24d44ac7534
Author: Shammon FY <zjur...@gmail.com>
AuthorDate: Tue Jun 6 10:18:14 2023 +0800

    [FLINK-32213][core] Add get off heap buffer in segment
    
    This closes #22675
---
 .../org/apache/flink/core/memory/MemorySegment.java  | 14 ++++++++++++++
 .../runtime/memory/MemorySegmentSimpleTest.java      | 20 ++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
index ce39a122654..93e81fc5be8 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
@@ -279,6 +279,20 @@ public final class MemorySegment {
         }
     }
 
+    /**
+     * Returns the off-heap buffer of memory segments.
+     *
+     * @return underlying off-heap buffer
+     * @throws IllegalStateException if the memory segment does not represent 
off-heap buffer
+     */
+    public ByteBuffer getOffHeapBuffer() {
+        if (offHeapBuffer != null) {
+            return offHeapBuffer;
+        } else {
+            throw new IllegalStateException("Memory segment does not represent 
off-heap buffer");
+        }
+    }
+
     /**
      * Returns the memory address of off-heap memory segments.
      *
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
index 8301a355862..d2a8da32f1c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
@@ -34,6 +34,8 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** Test reading and writing primitive types to {@link MemorySegment}. */
 public class MemorySegmentSimpleTest {
@@ -574,4 +576,22 @@ public class MemorySegmentSimpleTest {
             Assert.fail(e.getMessage());
         }
     }
+
+    @Test
+    public void testGetOffHeapBuffer() {
+        MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(1024);
+        assertThrows(
+                IllegalStateException.class,
+                seg::getOffHeapBuffer,
+                "Memory segment does not represent off-heap buffer");
+        seg.free();
+
+        seg = MemorySegmentFactory.allocateUnpooledOffHeapMemory(1024);
+        assertNotNull(seg.getOffHeapBuffer());
+        seg.free();
+
+        seg = MemorySegmentFactory.allocateOffHeapUnsafeMemory(1024);
+        assertNotNull(seg.getOffHeapBuffer());
+        seg.free();
+    }
 }

Reply via email to