Author: cmccabe Date: Fri Aug 9 18:14:07 2013 New Revision: 1512427 URL: http://svn.apache.org/r1512427 Log: HDFS-5049. Add JNI mlock support. (Andrew Wang via Colin Patrick McCabe)
Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java?rev=1512427&r1=1512426&r2=1512427&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java Fri Aug 9 18:14:07 2013 @@ -23,6 +23,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -145,6 +146,12 @@ public class NativeIO { return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded; } + private static void assertCodeLoaded() throws IOException { + if (!isAvailable()) { + throw new IOException("NativeIO was not loaded"); + } + } + /** Wrapper around open(2) */ public static native FileDescriptor open(String path, int flags, int mode) throws IOException; /** Wrapper around fstat(2) */ @@ -225,6 +232,84 @@ public class NativeIO { } } + static native void mlock_native( + ByteBuffer buffer, long len) throws NativeIOException; + static native void munlock_native( + ByteBuffer buffer, long len) throws NativeIOException; + + /** + * Locks the provided direct ByteBuffer into memory, preventing it from + * swapping out. After a buffer is locked, future accesses will not incur + * a page fault. + * + * See the mlock(2) man page for more information. + * + * @throws NativeIOException + */ + public static void mlock(ByteBuffer buffer, long len) + throws IOException { + assertCodeLoaded(); + if (!buffer.isDirect()) { + throw new IOException("Cannot mlock a non-direct ByteBuffer"); + } + mlock_native(buffer, len); + } + + /** + * Unlocks a locked direct ByteBuffer, allowing it to swap out of memory. + * This is a no-op if the ByteBuffer was not previously locked. + * + * See the munlock(2) man page for more information. + * + * @throws NativeIOException + */ + public static void munlock(ByteBuffer buffer, long len) + throws IOException { + assertCodeLoaded(); + if (!buffer.isDirect()) { + throw new IOException("Cannot munlock a non-direct ByteBuffer"); + } + munlock_native(buffer, len); + } + + /** + * Resource limit types copied from <sys/resource.h> + */ + private static class ResourceLimit { + public static final int RLIMIT_CPU = 0; + public static final int RLIMIT_FSIZE = 1; + public static final int RLIMIT_DATA = 2; + public static final int RLIMIT_STACK = 3; + public static final int RLIMIT_CORE = 4; + public static final int RLIMIT_RSS = 5; + public static final int RLIMIT_NPROC = 6; + public static final int RLIMIT_NOFILE = 7; + public static final int RLIMIT_MEMLOCK = 8; + public static final int RLIMIT_AS = 9; + public static final int RLIMIT_LOCKS = 10; + public static final int RLIMIT_SIGPENDING = 11; + public static final int RLIMIT_MSGQUEUE = 12; + public static final int RLIMIT_NICE = 13; + public static final int RLIMIT_RTPRIO = 14; + public static final int RLIMIT_RTTIME = 15; + public static final int RLIMIT_NLIMITS = 16; + } + + static native String getrlimit(int limit) throws NativeIOException; + /** + * Returns the soft limit on the number of bytes that may be locked by the + * process in bytes (RLIMIT_MEMLOCK). + * + * See the getrlimit(2) man page for more information + * + * @return maximum amount of locked memory in bytes + */ + public static long getMemlockLimit() throws IOException { + assertCodeLoaded(); + String strLimit = getrlimit(ResourceLimit.RLIMIT_MEMLOCK); + return Long.parseLong(strLimit); + } + /** Linux only methods used for getOwner() implementation */ private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException; private static native String getUserName(long uid) throws IOException; Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c?rev=1512427&r1=1512426&r2=1512427&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c (original) +++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c Fri Aug 9 18:14:07 2013 @@ -31,8 +31,11 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> +#include <sys/mman.h> +#include <sys/resource.h> #include <sys/stat.h> #include <sys/syscall.h> +#include <sys/time.h> #include <sys/types.h> #include <unistd.h> #include "config.h" @@ -360,6 +363,76 @@ Java_org_apache_hadoop_io_nativeio_Nativ #endif } +/** + * public static native void mlock_native( + * ByteBuffer buffer, long offset); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_mlock_1native( + JNIEnv *env, jclass clazz, + jobject buffer, jlong len) +{ + void* buf = (void*)(*env)->GetDirectBufferAddress(env, buffer); + PASS_EXCEPTIONS(env); + + if (mlock(buf, len)) { + throw_ioe(env, errno); + } +} + +/** + * public static native void munlock_native( + * ByteBuffer buffer, long offset); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT void JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_munlock_1native( + JNIEnv *env, jclass clazz, + jobject buffer, jlong len) +{ + void* buf = (void*)(*env)->GetDirectBufferAddress(env, buffer); + PASS_EXCEPTIONS(env); + + if (munlock(buf, len)) { + throw_ioe(env, errno); + } +} + +/** + * public static native String getrlimit( + * int resource); + * + * The "00024" in the function name is an artifact of how JNI encodes + * special characters. U+0024 is '$'. + */ +JNIEXPORT jstring JNICALL +Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_getrlimit( + JNIEnv *env, jclass clazz, + jint resource) +{ + jstring ret = NULL; + + struct rlimit rlim; + int rc = getrlimit((int)resource, &rlim); + if (rc != 0) { + throw_ioe(env, errno); + goto cleanup; + } + + // Convert soft limit into a string + char limit[17]; + int len = snprintf(&limit, 17, "%d", rlim.rlim_cur); + ret = (*env)->NewStringUTF(env,&limit); + +cleanup: + return ret; +} + #ifdef __FreeBSD__ static int toFreeBSDFlags(int flags) { Modified: hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java?rev=1512427&r1=1512426&r2=1512427&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java Fri Aug 9 18:14:07 2013 @@ -24,6 +24,9 @@ import java.io.FileOutputStream; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; import java.util.concurrent.atomic.AtomicReference; import java.util.ArrayList; import java.util.Arrays; @@ -32,6 +35,7 @@ import java.util.List; import org.junit.Assert; import org.junit.Before; import org.junit.Test; + import static org.junit.Assume.*; import static org.junit.Assert.*; @@ -45,6 +49,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.NativeCodeLoader; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Time; public class TestNativeIO { @@ -524,4 +529,57 @@ public class TestNativeIO { FileUtils.deleteQuietly(TEST_DIR); } + + @Test(timeout=10000) + public void testMlock() throws Exception { + assumeTrue(NativeIO.isAvailable()); + assumeTrue(Shell.LINUX); + final File TEST_FILE = new File(new File( + System.getProperty("test.build.data","build/test/data")), + "testMlockFile"); + final int BUF_LEN = 12289; + byte buf[] = new byte[BUF_LEN]; + int bufSum = 0; + for (int i = 0; i < buf.length; i++) { + buf[i] = (byte)(i % 60); + bufSum += buf[i]; + } + FileOutputStream fos = new FileOutputStream(TEST_FILE); + fos.write(buf); + fos.getChannel().force(true); + fos.close(); + + FileInputStream fis = null; + FileChannel channel = null; + try { + // Map file into memory + fis = new FileInputStream(TEST_FILE); + channel = fis.getChannel(); + long fileSize = channel.size(); + MappedByteBuffer mapbuf = channel.map(MapMode.READ_ONLY, 0, fileSize); + // mlock the buffer + NativeIO.POSIX.mlock(mapbuf, fileSize); + // Read the buffer + int sum = 0; + for (int i=0; i<fileSize; i++) { + sum += mapbuf.get(i); + } + assertEquals("Expected sums to be equal", bufSum, sum); + // munlock the buffer + NativeIO.POSIX.munlock(mapbuf, fileSize); + } finally { + if (channel != null) { + channel.close(); + } + if (fis != null) { + fis.close(); + } + } + } + + @Test(timeout=10000) + public void testGetMemlockLimit() throws Exception { + assumeTrue(NativeIO.isAvailable()); + NativeIO.POSIX.getMemlockLimit(); + } }