This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 492d1d4 [FLINK-11724][core] Add copyToUnsafe, copyFromUnsafe and equalTo to MemorySegment. 492d1d4 is described below commit 492d1d40100b1297281408bf0d83b6db5378b9cb Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Wed Feb 27 19:37:52 2019 +0800 [FLINK-11724][core] Add copyToUnsafe, copyFromUnsafe and equalTo to MemorySegment. This closes #7847 --- .../apache/flink/core/memory/MemorySegment.java | 77 ++++++++++++++++++++++ .../flink/core/memory/CrossSegmentTypeTest.java | 10 +++ .../flink/core/memory/MemorySegmentTestBase.java | 35 ++++++++++ 3 files changed, 122 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java index b95ceb9..39b6d9c 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java @@ -1270,6 +1270,50 @@ public abstract class MemorySegment { } } + /** + * Bulk copy method. Copies {@code numBytes} bytes to target unsafe object and pointer. + * NOTE: This is a unsafe method, no check here, please be carefully. + * + * @param offset The position where the bytes are started to be read from in this memory segment. + * @param target The unsafe memory to copy the bytes to. + * @param targetPointer The position in the target unsafe memory to copy the chunk to. + * @param numBytes The number of bytes to copy. + * + * @throws IndexOutOfBoundsException If the source segment does not contain the given number + * of bytes (starting from offset). + */ + public final void copyToUnsafe(int offset, Object target, int targetPointer, int numBytes) { + final long thisPointer = this.address + offset; + if (thisPointer + numBytes > addressLimit) { + throw new IndexOutOfBoundsException( + String.format("offset=%d, numBytes=%d, address=%d", + offset, numBytes, this.address)); + } + UNSAFE.copyMemory(this.heapMemory, thisPointer, target, targetPointer, numBytes); + } + + /** + * Bulk copy method. Copies {@code numBytes} bytes from source unsafe object and pointer. + * NOTE: This is a unsafe method, no check here, please be carefully. + * + * @param offset The position where the bytes are started to be write in this memory segment. + * @param source The unsafe memory to copy the bytes from. + * @param sourcePointer The position in the source unsafe memory to copy the chunk from. + * @param numBytes The number of bytes to copy. + * + * @throws IndexOutOfBoundsException If this segment can not contain the given number + * of bytes (starting from offset). + */ + public final void copyFromUnsafe(int offset, Object source, int sourcePointer, int numBytes) { + final long thisPointer = this.address + offset; + if (thisPointer + numBytes > addressLimit) { + throw new IndexOutOfBoundsException( + String.format("offset=%d, numBytes=%d, address=%d", + offset, numBytes, this.address)); + } + UNSAFE.copyMemory(source, sourcePointer, this.heapMemory, thisPointer, numBytes); + } + // ------------------------------------------------------------------------- // Comparisons & Swapping // ------------------------------------------------------------------------- @@ -1349,4 +1393,37 @@ public abstract class MemorySegment { String.format("offset1=%d, offset2=%d, len=%d, bufferSize=%d, address1=%d, address2=%d", offset1, offset2, len, tempBuffer.length, this.address, seg2.address)); } + + /** + * Equals two memory segment regions. + * + * @param seg2 Segment to equal this segment with + * @param offset1 Offset of this segment to start equaling + * @param offset2 Offset of seg2 to start equaling + * @param length Length of the equaled memory region + * + * @return true if equal, false otherwise + */ + public final boolean equalTo(MemorySegment seg2, int offset1, int offset2, int length) { + int i = 0; + + // we assume unaligned accesses are supported. + // Compare 8 bytes at a time. + while (i <= length - 8) { + if (getLong(offset1 + i) != seg2.getLong(offset2 + i)) { + return false; + } + i += 8; + } + + // cover the last (length % 8) elements. + while (i < length) { + if (get(offset1 + i) != seg2.get(offset2 + i)) { + return false; + } + i += 1; + } + + return true; + } } diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java index ea144c7..ccff2ba 100644 --- a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java @@ -35,6 +35,8 @@ import static org.junit.Assert.fail; */ public class CrossSegmentTypeTest { + private static final long BYTE_ARRAY_BASE_OFFSET = MemoryUtils.UNSAFE.arrayBaseOffset(byte[].class); + private final int pageSize = 32 * 1024; // ------------------------------------------------------------------------ @@ -187,6 +189,8 @@ public class CrossSegmentTypeTest { byte[] expected = new byte[pageSize]; byte[] actual = new byte[pageSize]; + byte[] unsafeCopy = new byte[pageSize]; + MemorySegment unsafeCopySeg = MemorySegmentFactory.allocateUnpooledSegment(pageSize); // zero out the memory seg1.put(0, expected); @@ -205,6 +209,12 @@ public class CrossSegmentTypeTest { seg1.put(thisPos, bytes); seg1.copyTo(thisPos, seg2, otherPos, numBytes); + seg1.copyToUnsafe(thisPos, unsafeCopy, (int) (otherPos + BYTE_ARRAY_BASE_OFFSET), numBytes); + + int otherPos2 = random.nextInt(pageSize - numBytes); + unsafeCopySeg.copyFromUnsafe(otherPos2, unsafeCopy, + (int) (otherPos + BYTE_ARRAY_BASE_OFFSET), numBytes); + assertTrue(unsafeCopySeg.equalTo(seg2, otherPos2, otherPos, numBytes)); } seg2.get(0, actual); diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentTestBase.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentTestBase.java index fb28948..0b8f1d0 100644 --- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentTestBase.java @@ -304,6 +304,41 @@ public abstract class MemorySegmentTestBase { } @Test + public void testCopyUnsafeIndexOutOfBounds() { + byte[] bytes = new byte[pageSize]; + MemorySegment segment = createSegment(pageSize); + + try { + segment.copyToUnsafe(1, bytes, 0, pageSize); + fail("should fail with an IndexOutOfBoundsException"); + } + catch (IndexOutOfBoundsException ignored) {} + + try { + segment.copyFromUnsafe(1, bytes, 0, pageSize); + fail("should fail with an IndexOutOfBoundsException"); + } + catch (IndexOutOfBoundsException ignored) {} + } + + @Test + public void testEqualTo() { + MemorySegment seg1 = createSegment(pageSize); + MemorySegment seg2 = createSegment(pageSize); + + int i = new Random().nextInt(pageSize - 8); + + seg1.put(i, (byte) 10); + assertFalse(seg1.equalTo(seg2, i, i, 9)); + + seg1.put(i, (byte) 0); + assertTrue(seg1.equalTo(seg2, i, i, 9)); + + seg1.put(i + 8, (byte) 10); + assertFalse(seg1.equalTo(seg2, i, i, 9)); + } + + @Test public void testCharAccess() { final MemorySegment segment = createSegment(pageSize);