This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new fa4a6991 [Flink-30892] Clean codes for memory and tests fa4a6991 is described below commit fa4a69913987416d7fd6b3ebe21d0eb39945fff2 Author: Feng Wang <99001603+wangfeng...@users.noreply.github.com> AuthorDate: Fri Feb 3 17:58:13 2023 +0800 [Flink-30892] Clean codes for memory and tests This closes #498 --- .../flink/table/store/memory/MemorySegment.java | 8 ---- .../flink/table/store/memory/MemoryUtils.java | 52 ---------------------- ...est.java => HadoopLocalFileIOBehaviorTest.java} | 7 +-- .../table/store/memory/MemorySegmentTestBase.java | 17 +++---- 4 files changed, 11 insertions(+), 73 deletions(-) diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemorySegment.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemorySegment.java index b0edac65..b254a07c 100644 --- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemorySegment.java +++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemorySegment.java @@ -102,14 +102,6 @@ public final class MemorySegment { } } - public long getAddress() { - if (heapMemory == null) { - return address; - } else { - throw new IllegalStateException("Memory segment does not represent off heap memory"); - } - } - public ByteBuffer wrap(int offset, int length) { return wrapInternal(offset, length); } diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemoryUtils.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemoryUtils.java index 955f2b5f..9ff236ae 100644 --- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemoryUtils.java +++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemoryUtils.java @@ -33,10 +33,6 @@ public class MemoryUtils { private static final long BUFFER_ADDRESS_FIELD_OFFSET = getClassFieldOffset(Buffer.class, "address"); - private static final long BUFFER_CAPACITY_FIELD_OFFSET = - getClassFieldOffset(Buffer.class, "capacity"); - private static final Class<?> DIRECT_BYTE_BUFFER_CLASS = - getClassByName("java.nio.DirectByteBuffer"); @SuppressWarnings("restriction") private static sun.misc.Unsafe getUnsafe() { @@ -94,54 +90,6 @@ public class MemoryUtils { } } - /** - * Allocates unsafe native memory. - * - * @param size size of the unsafe memory to allocate. - * @return address of the allocated unsafe memory - */ - static long allocateUnsafe(long size) { - return UNSAFE.allocateMemory(Math.max(1L, size)); - } - - /** - * Creates a cleaner to release the unsafe memory. - * - * @param address address of the unsafe memory to release - * @param customCleanup A custom action to clean up GC - * @return action to run to release the unsafe memory manually - */ - static Runnable createMemoryCleaner(long address, Runnable customCleanup) { - return () -> { - releaseUnsafe(address); - customCleanup.run(); - }; - } - - private static void releaseUnsafe(long address) { - UNSAFE.freeMemory(address); - } - - /** - * Wraps the unsafe native memory with a {@link ByteBuffer}. - * - * @param address address of the unsafe memory to wrap - * @param size size of the unsafe memory to wrap - * @return a {@link ByteBuffer} which is a view of the given unsafe memory - */ - static ByteBuffer wrapUnsafeMemoryWithByteBuffer(long address, int size) { - //noinspection OverlyBroadCatchBlock - try { - ByteBuffer buffer = (ByteBuffer) UNSAFE.allocateInstance(DIRECT_BYTE_BUFFER_CLASS); - UNSAFE.putLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET, address); - UNSAFE.putInt(buffer, BUFFER_CAPACITY_FIELD_OFFSET, size); - buffer.clear(); - return buffer; - } catch (Throwable t) { - throw new Error("Failed to wrap unsafe off-heap memory with ByteBuffer", t); - } - } - /** * Get native memory address wrapped by the given {@link ByteBuffer}. * diff --git a/flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopLocalFileSystemBehaviorTest.java b/flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopLocalFileIOBehaviorTest.java similarity index 91% rename from flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopLocalFileSystemBehaviorTest.java rename to flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopLocalFileIOBehaviorTest.java index 6a24627a..8b799be2 100644 --- a/flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopLocalFileSystemBehaviorTest.java +++ b/flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopLocalFileIOBehaviorTest.java @@ -18,7 +18,6 @@ package org.apache.flink.table.store.fs; -import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.table.store.fs.hadoop.HadoopFileIO; import org.apache.hadoop.conf.Configuration; @@ -26,17 +25,19 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.util.VersionInfo; import org.junit.jupiter.api.io.TempDir; +import java.net.URI; + import static org.assertj.core.api.Assumptions.assumeThat; /** Behavior tests for Hadoop Local. */ -class HadoopLocalFileSystemBehaviorTest extends FileIOBehaviorTestBase { +class HadoopLocalFileIOBehaviorTest extends FileIOBehaviorTestBase { @TempDir private java.nio.file.Path tmp; @Override protected FileIO getFileSystem() throws Exception { org.apache.hadoop.fs.FileSystem fs = new RawLocalFileSystem(); - fs.initialize(LocalFileSystem.getLocalFsURI(), new Configuration()); + fs.initialize(URI.create("file:///"), new Configuration()); HadoopFileIO fileIO = new HadoopFileIO(); fileIO.setFileSystem(fs); return fileIO; diff --git a/flink-table-store-common/src/test/java/org/apache/flink/table/store/memory/MemorySegmentTestBase.java b/flink-table-store-common/src/test/java/org/apache/flink/table/store/memory/MemorySegmentTestBase.java index 49e7bc33..40b382bf 100644 --- a/flink-table-store-common/src/test/java/org/apache/flink/table/store/memory/MemorySegmentTestBase.java +++ b/flink-table-store-common/src/test/java/org/apache/flink/table/store/memory/MemorySegmentTestBase.java @@ -35,13 +35,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Random; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -973,13 +970,13 @@ public abstract class MemorySegmentTestBase { seg1.put(0, bytes1); seg2.put(0, bytes2); - assertThat(seg1.compare(seg2, 0, 0, 3, 4), lessThan(0)); - assertThat(seg1.compare(seg2, 0, 0, 3, 3), equalTo(0)); - assertThat(seg1.compare(seg2, 0, 0, 3, 2), greaterThan(0)); + assertThat(seg1.compare(seg2, 0, 0, 3, 4)).isLessThan(0); + assertThat(seg1.compare(seg2, 0, 0, 3, 3)).isEqualTo(0); + assertThat(seg1.compare(seg2, 0, 0, 3, 2)).isGreaterThan(0); // test non-zero offset - assertThat(seg1.compare(seg2, 1, 1, 2, 3), lessThan(0)); - assertThat(seg1.compare(seg2, 1, 1, 2, 2), equalTo(0)); - assertThat(seg1.compare(seg2, 1, 1, 2, 1), greaterThan(0)); + assertThat(seg1.compare(seg2, 1, 1, 2, 3)).isLessThan(0); + assertThat(seg1.compare(seg2, 1, 1, 2, 2)).isEqualTo(0); + assertThat(seg1.compare(seg2, 1, 1, 2, 1)).isGreaterThan(0); } @Test