This is an automated email from the ASF dual-hosted git repository. rakeshr pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 6eb5fb596ffa44e4c4fe67e960f396cb4b37c80f Author: Sammi Chen <sammic...@apache.org> AuthorDate: Wed Jun 5 21:33:00 2019 +0800 HDFS-14356. Implement HDFS cache on SCM with native PMDK libs. Contributed by Feilong He. (cherry picked from commit d1aad444907e1fc5314e8e64529e57c51ed7561c) --- BUILDING.txt | 28 +++ dev-support/bin/dist-copynativelibs | 8 + hadoop-common-project/hadoop-common/pom.xml | 2 + .../hadoop-common/src/CMakeLists.txt | 21 ++ .../hadoop-common/src/config.h.cmake | 1 + .../org/apache/hadoop/io/nativeio/NativeIO.java | 135 ++++++++++- .../src/org/apache/hadoop/io/nativeio/NativeIO.c | 252 +++++++++++++++++++++ .../src/org/apache/hadoop/io/nativeio/pmdk_load.c | 106 +++++++++ .../src/org/apache/hadoop/io/nativeio/pmdk_load.h | 95 ++++++++ .../apache/hadoop/io/nativeio/TestNativeIO.java | 153 +++++++++++++ .../datanode/fsdataset/impl/FsDatasetCache.java | 22 ++ .../datanode/fsdataset/impl/FsDatasetImpl.java | 8 + .../datanode/fsdataset/impl/FsDatasetUtil.java | 22 ++ .../datanode/fsdataset/impl/MappableBlock.java | 6 + .../fsdataset/impl/MappableBlockLoader.java | 11 +- .../fsdataset/impl/MappableBlockLoaderFactory.java | 4 + .../fsdataset/impl/MemoryMappableBlockLoader.java | 8 +- .../datanode/fsdataset/impl/MemoryMappedBlock.java | 5 + ...der.java => NativePmemMappableBlockLoader.java} | 166 +++++++------- ...MappedBlock.java => NativePmemMappedBlock.java} | 49 ++-- .../fsdataset/impl/PmemMappableBlockLoader.java | 10 +- .../datanode/fsdataset/impl/PmemMappedBlock.java | 5 + 22 files changed, 1009 insertions(+), 108 deletions(-) diff --git a/BUILDING.txt b/BUILDING.txt index 1b900c3..0698469 100644 --- a/BUILDING.txt +++ b/BUILDING.txt @@ -86,6 +86,8 @@ Optional packages: $ sudo apt-get install fuse libfuse-dev * ZStandard compression $ sudo apt-get install zstd +* PMDK library for storage class memory(SCM) as HDFS cache backend + Please refer to http://pmem.io/ and https://github.com/pmem/pmdk ---------------------------------------------------------------------------------- Maven main modules: @@ -262,6 +264,32 @@ Maven build goals: invoke, run 'mvn dependency-check:aggregate'. Note that this plugin requires maven 3.1.1 or greater. + PMDK library build options: + + The Persistent Memory Development Kit (PMDK), formerly known as NVML, is a growing + collection of libraries which have been developed for various use cases, tuned, + validated to production quality, and thoroughly documented. These libraries are built + on the Direct Access (DAX) feature available in both Linux and Windows, which allows + applications directly load/store access to persistent memory by memory-mapping files + on a persistent memory aware file system. + + It is currently an optional component, meaning that Hadoop can be built without + this dependency. Please Note the library is used via dynamic module. For getting + more details please refer to the official sites: + http://pmem.io/ and https://github.com/pmem/pmdk. + + * -Drequire.pmdk is used to build the project with PMDK libraries forcibly. With this + option provided, the build will fail if libpmem library is not found. If this option + is not given, the build will generate a version of Hadoop with libhadoop.so. + And storage class memory(SCM) backed HDFS cache is still supported without PMDK involved. + Because PMDK can bring better caching write/read performance, it is recommended to build + the project with this option if user plans to use SCM backed HDFS cache. + * -Dpmdk.lib is used to specify a nonstandard location for PMDK libraries if they are not + under /usr/lib or /usr/lib64. + * -Dbundle.pmdk is used to copy the specified libpmem libraries into the distribution tar + package. This option requires that -Dpmdk.lib is specified. With -Dbundle.pmdk provided, + the build will fail if -Dpmdk.lib is not specified. + ---------------------------------------------------------------------------------- Building components separately diff --git a/dev-support/bin/dist-copynativelibs b/dev-support/bin/dist-copynativelibs index 67d2edf..4a783f0 100755 --- a/dev-support/bin/dist-copynativelibs +++ b/dev-support/bin/dist-copynativelibs @@ -96,6 +96,12 @@ for i in "$@"; do --isalbundle=*) ISALBUNDLE=${i#*=} ;; + --pmdklib=*) + PMDKLIB=${i#*=} + ;; + --pmdkbundle=*) + PMDKBUNDLE=${i#*=} + ;; --opensslbinbundle=*) OPENSSLBINBUNDLE=${i#*=} ;; @@ -153,6 +159,8 @@ if [[ -d "${LIB_DIR}" ]]; then bundle_native_lib "${OPENSSLLIBBUNDLE}" "openssl.lib" "crypto" "${OPENSSLLIB}" bundle_native_lib "${ISALBUNDLE}" "isal.lib" "isa" "${ISALLIB}" + + bundle_native_lib "${PMDKBUNDLE}" "pmdk.lib" "pmdk" "${PMDKLIB}" fi # Windows diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index 23bd270..5f48fb8 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -713,6 +713,8 @@ <REQUIRE_ISAL>${require.isal} </REQUIRE_ISAL> <CUSTOM_ISAL_PREFIX>${isal.prefix} </CUSTOM_ISAL_PREFIX> <CUSTOM_ISAL_LIB>${isal.lib} </CUSTOM_ISAL_LIB> + <REQUIRE_PMDK>${require.pmdk}</REQUIRE_PMDK> + <CUSTOM_PMDK_LIB>${pmdk.lib}</CUSTOM_PMDK_LIB> <REQUIRE_OPENSSL>${require.openssl} </REQUIRE_OPENSSL> <CUSTOM_OPENSSL_PREFIX>${openssl.prefix} </CUSTOM_OPENSSL_PREFIX> <CUSTOM_OPENSSL_LIB>${openssl.lib} </CUSTOM_OPENSSL_LIB> diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt index b9287c0..771c685 100644 --- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt +++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt @@ -121,6 +121,7 @@ else () ENDIF(REQUIRE_ZSTD) endif () +#Require ISA-L set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) hadoop_set_find_shared_library_version("2") find_library(ISAL_LIBRARY @@ -159,6 +160,25 @@ else (ISAL_LIBRARY) ENDIF(REQUIRE_ISAL) endif (ISAL_LIBRARY) +# Build with PMDK library if -Drequire.pmdk option is specified. +if(REQUIRE_PMDK) + set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) + hadoop_set_find_shared_library_version("1") + find_library(PMDK_LIBRARY + NAMES pmem + PATHS ${CUSTOM_PMDK_LIB} /usr/lib /usr/lib64) + set(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES}) + + if(PMDK_LIBRARY) + GET_FILENAME_COMPONENT(HADOOP_PMDK_LIBRARY ${PMDK_LIBRARY} NAME) + set(PMDK_SOURCE_FILES ${SRC}/io/nativeio/pmdk_load.c) + else(PMDK_LIBRARY) + MESSAGE(FATAL_ERROR "The required PMDK library is NOT found. PMDK_LIBRARY=${PMDK_LIBRARY}") + endif(PMDK_LIBRARY) +else(REQUIRE_PMDK) + MESSAGE(STATUS "Build without PMDK support.") +endif(REQUIRE_PMDK) + # Build hardware CRC32 acceleration, if supported on the platform. if(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64") set(BULK_CRC_ARCH_SOURCE_FIlE "${SRC}/util/bulk_crc32_x86.c") @@ -256,6 +276,7 @@ hadoop_add_dual_library(hadoop ${SRC}/io/compress/zlib/ZlibDecompressor.c ${BZIP2_SOURCE_FILES} ${SRC}/io/nativeio/NativeIO.c + ${PMDK_SOURCE_FILES} ${SRC}/io/nativeio/errno_enum.c ${SRC}/io/nativeio/file_descriptor.c ${SRC}/io/nativeio/SharedFileDescriptorFactory.c diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake b/hadoop-common-project/hadoop-common/src/config.h.cmake index 40aa467..7e23a5d 100644 --- a/hadoop-common-project/hadoop-common/src/config.h.cmake +++ b/hadoop-common-project/hadoop-common/src/config.h.cmake @@ -24,6 +24,7 @@ #cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@" #cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@" #cmakedefine HADOOP_ISAL_LIBRARY "@HADOOP_ISAL_LIBRARY@" +#cmakedefine HADOOP_PMDK_LIBRARY "@HADOOP_PMDK_LIBRARY@" #cmakedefine HAVE_SYNC_FILE_RANGE #cmakedefine HAVE_POSIX_FADVISE diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java index 4e0cd8f..1d0eab7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -100,6 +100,48 @@ public class NativeIO { write. */ public static int SYNC_FILE_RANGE_WAIT_AFTER = 4; + /** + * Keeps the support state of PMDK. + */ + public enum SupportState { + UNSUPPORTED(-1), + PMDK_LIB_NOT_FOUND(1), + SUPPORTED(0); + + private byte stateCode; + SupportState(int stateCode) { + this.stateCode = (byte) stateCode; + } + + public int getStateCode() { + return stateCode; + } + + public String getMessage() { + String msg; + switch (stateCode) { + case -1: + msg = "The native code is built without PMDK support."; + break; + case 1: + msg = "The native code is built with PMDK support, but PMDK libs " + + "are NOT found in execution environment or failed to be loaded."; + break; + case 0: + msg = "The native code is built with PMDK support, and PMDK libs " + + "are loaded successfully."; + break; + default: + msg = "The state code: " + stateCode + " is unrecognized!"; + } + return msg; + } + } + + // Denotes the state of supporting PMDK. The value is set by JNI. + private static SupportState pmdkSupportState = + SupportState.PMDK_LIB_NOT_FOUND; + private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class); // Set to true via JNI if possible @@ -124,6 +166,93 @@ public class NativeIO { POSIX.cacheManipulator = cacheManipulator; } + // This method is invoked by JNI. + public static void setPmdkSupportState(int stateCode) { + for (SupportState state : SupportState.values()) { + if (state.getStateCode() == stateCode) { + pmdkSupportState = state; + return; + } + } + LOG.error("The state code: " + stateCode + " is unrecognized!"); + } + + public static boolean isPmdkAvailable() { + LOG.info(pmdkSupportState.getMessage()); + return pmdkSupportState == SupportState.SUPPORTED; + } + + /** + * Denote memory region for a file mapped. + */ + public static class PmemMappedRegion { + private long address; + private long length; + private boolean isPmem; + + public PmemMappedRegion(long address, long length, boolean isPmem) { + this.address = address; + this.length = length; + this.isPmem = isPmem; + } + + public boolean isPmem() { + return this.isPmem; + } + + public long getAddress() { + return this.address; + } + + public long getLength() { + return this.length; + } + } + + /** + * JNI wrapper of persist memory operations. + */ + public static class Pmem { + // check whether the address is a Pmem address or DIMM address + public static boolean isPmem(long address, long length) { + return NativeIO.POSIX.isPmemCheck(address, length); + } + + // create a pmem file and memory map it + public static PmemMappedRegion mapBlock(String path, long length) { + return NativeIO.POSIX.pmemCreateMapFile(path, length); + } + + // unmap a pmem file + public static boolean unmapBlock(long address, long length) { + return NativeIO.POSIX.pmemUnMap(address, length); + } + + // copy data from disk file(src) to pmem file(dest), without flush + public static void memCopy(byte[] src, long dest, boolean isPmem, + long length) { + NativeIO.POSIX.pmemCopy(src, dest, isPmem, length); + } + + // flush the memory content to persistent storage + public static void memSync(PmemMappedRegion region) { + if (region.isPmem()) { + NativeIO.POSIX.pmemDrain(); + } else { + NativeIO.POSIX.pmemSync(region.getAddress(), region.getLength()); + } + } + } + + private static native boolean isPmemCheck(long address, long length); + private static native PmemMappedRegion pmemCreateMapFile(String path, + long length); + private static native boolean pmemUnMap(long address, long length); + private static native void pmemCopy(byte[] src, long dest, boolean isPmem, + long length); + private static native void pmemDrain(); + private static native void pmemSync(long address, long length); + /** * Used to manipulate the operating system cache. */ @@ -143,8 +272,8 @@ public class NativeIO { } public void posixFadviseIfPossible(String identifier, - FileDescriptor fd, long offset, long len, int flags) - throws NativeIOException { + FileDescriptor fd, long offset, long len, int flags) + throws NativeIOException { NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset, len, flags); } @@ -748,7 +877,7 @@ public class NativeIO { * user account name, of the format DOMAIN\UserName. This method * will remove the domain part of the full logon name. * - * @param Fthe full principal name containing the domain + * @param name the full principal name containing the domain * @return name with domain removed */ private static String stripDomain(String name) { diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c index 2274d57..3a0641b 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c @@ -36,6 +36,10 @@ #include <sys/resource.h> #include <sys/stat.h> #include <sys/syscall.h> +#ifdef HADOOP_PMDK_LIBRARY +#include <libpmem.h> +#include "pmdk_load.h" +#endif #if !(defined(__FreeBSD__) || defined(__MACH__)) #include <sys/sendfile.h> #endif @@ -60,6 +64,7 @@ #define NATIVE_IO_POSIX_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX" #define NATIVE_IO_STAT_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat" +#define NATIVE_IO_POSIX_PMEMREGION_CLASS "org/apache/hadoop/io/nativeio/NativeIO$POSIX$PmemMappedRegion" #define SET_INT_OR_RETURN(E, C, F) \ { \ @@ -81,6 +86,12 @@ static jmethodID nioe_ctor; // Please see HADOOP-7156 for details. jobject pw_lock_object; +#ifdef HADOOP_PMDK_LIBRARY +// the NativeIO$POSIX$PmemMappedRegion inner class and its constructor +static jclass pmem_region_clazz = NULL; +static jmethodID pmem_region_ctor = NULL; +#endif + /* * Throw a java.IO.IOException, generating the message from errno. * NB. this is also used form windows_secure_container_executor.c @@ -269,6 +280,63 @@ static void nioe_deinit(JNIEnv *env) { nioe_ctor = NULL; } +#ifdef HADOOP_PMDK_LIBRARY +static int loadPmdkLib(JNIEnv *env) { + char errMsg[1024]; + jclass clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_CLASS); + if (clazz == NULL) { + return 0; // exception has been raised + } + load_pmdk_lib(errMsg, sizeof(errMsg)); + jmethodID mid = (*env)->GetStaticMethodID(env, clazz, "setPmdkSupportState", "(I)V"); + if (mid == 0) { + return 0; + } + if (strlen(errMsg) > 0) { + (*env)->CallStaticVoidMethod(env, clazz, mid, 1); + return 0; + } + (*env)->CallStaticVoidMethod(env, clazz, mid, 0); + return 1; +} + +static void pmem_region_init(JNIEnv *env, jclass nativeio_class) { + + jclass clazz = NULL; + // Init Stat + clazz = (*env)->FindClass(env, NATIVE_IO_POSIX_PMEMREGION_CLASS); + if (!clazz) { + THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion class"); + return; // exception has been raised + } + + // Init PmemMappedRegion class + pmem_region_clazz = (*env)->NewGlobalRef(env, clazz); + if (!pmem_region_clazz) { + THROW(env, "java/io/IOException", "Failed to new global reference of PmemMappedRegion class"); + return; // exception has been raised + } + + pmem_region_ctor = (*env)->GetMethodID(env, pmem_region_clazz, "<init>", "(JJZ)V"); + if (!pmem_region_ctor) { + THROW(env, "java/io/IOException", "Failed to get PmemMappedRegion constructor"); + return; // exception has been raised + } +} + +static void pmem_region_deinit(JNIEnv *env) { + if (pmem_region_ctor != NULL) { + (*env)->DeleteGlobalRef(env, pmem_region_ctor); + pmem_region_ctor = NULL; + } + + if (pmem_region_clazz != NULL) { + (*env)->DeleteGlobalRef(env, pmem_region_clazz); + pmem_region_clazz = NULL; + } + } +#endif + /* * private static native void initNative(); * @@ -292,6 +360,11 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_initNative( #ifdef UNIX errno_enum_init(env); PASS_EXCEPTIONS_GOTO(env, error); +#ifdef HADOOP_PMDK_LIBRARY + if (loadPmdkLib(env)) { + pmem_region_init(env, clazz); + } +#endif #endif return; error: @@ -299,6 +372,9 @@ error: // class wasn't initted yet #ifdef UNIX stat_deinit(env); +#ifdef HADOOP_PMDK_LIBRARY + pmem_region_deinit(env); +#endif #endif nioe_deinit(env); fd_deinit(env); @@ -1383,3 +1459,179 @@ cleanup: /** * vim: sw=2: ts=2: et: */ + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX + * Method: isPmemCheck + * Signature: (JJ)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_isPmemCheck( +JNIEnv *env, jclass thisClass, jlong address, jlong length) { + #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY) + jint is_pmem = pmdkLoader->pmem_is_pmem(address, length); + return (is_pmem) ? JNI_TRUE : JNI_FALSE; + #else + THROW(env, "java/lang/UnsupportedOperationException", + "The function isPmemCheck is not supported."); + return JNI_FALSE; + #endif + } + +/* + * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX + * Method: pmemCreateMapFile + * Signature: (Ljava/lang/String;J)Lorg/apache/hadoop/io/nativeio/NativeIO/POSIX/PmemMappedRegion; + */ +JNIEXPORT jobject JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCreateMapFile( +JNIEnv *env, jclass thisClass, jstring filePath, jlong fileLength) { + #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY) + /* create a pmem file and memory map it */ + const char * path = NULL; + void * pmemaddr = NULL; + size_t mapped_len = 0; + int is_pmem = 1; + char msg[1000]; + + path = (*env)->GetStringUTFChars(env, filePath, NULL); + if (!path) { + THROW(env, "java/lang/IllegalArgumentException", "File Path cannot be null"); + return NULL; + } + + if (fileLength <= 0) { + (*env)->ReleaseStringUTFChars(env, filePath, path); + THROW(env, "java/lang/IllegalArgumentException", "File length should be positive"); + return NULL; + } + + pmemaddr = pmdkLoader->pmem_map_file(path, fileLength, PMEM_FILE_CREATE|PMEM_FILE_EXCL, + 0666, &mapped_len, &is_pmem); + + if (!pmemaddr) { + snprintf(msg, sizeof(msg), "Failed to create pmem file. file: %s, length: %x, error msg: %s", path, fileLength, pmem_errormsg()); + THROW(env, "java/io/IOException", msg); + (*env)->ReleaseStringUTFChars(env, filePath, path); + return NULL; + } + + if (fileLength != mapped_len) { + snprintf(msg, sizeof(msg), "Mapped length doesn't match the request length. file :%s, request length:%x, returned length:%x, error msg:%s", path, fileLength, mapped_len, pmem_errormsg()); + THROW(env, "java/io/IOException", msg); + (*env)->ReleaseStringUTFChars(env, filePath, path); + return NULL; + } + + (*env)->ReleaseStringUTFChars(env, filePath, path); + + if ((!pmem_region_clazz) || (!pmem_region_ctor)) { + THROW(env, "java/io/IOException", "PmemMappedRegion class or constructor is NULL"); + return NULL; + } + + jobject ret = (*env)->NewObject(env, pmem_region_clazz, pmem_region_ctor, pmemaddr, mapped_len, (jboolean)is_pmem); + return ret; + + #else + THROW(env, "java/lang/UnsupportedOperationException", + "The function pmemCreateMapFile is not supported."); + return NULL; + #endif + } + +/* + * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX + * Method: pmemUnMap + * Signature: (JJ)V + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemUnMap( +JNIEnv *env, jclass thisClass, jlong address, jlong length) { + #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY) + int succeed = 0; + char msg[1000]; + succeed = pmdkLoader->pmem_unmap(address, length); + // succeed = -1 failure; succeed = 0 success + if (succeed != 0) { + snprintf(msg, sizeof(msg), "Failed to unmap region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg()); + THROW(env, "java/io/IOException", msg); + return JNI_FALSE; + } else { + return JNI_TRUE; + } + #else + THROW(env, "java/lang/UnsupportedOperationException", + "The function pmemUnMap is not supported."); + return JNI_FALSE; + #endif + } + +/* + * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX + * Method: pmemCopy + * Signature: ([BJJ)V + */ +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemCopy( +JNIEnv *env, jclass thisClass, jbyteArray buf, jlong address, jboolean is_pmem, jlong length) { + #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY) + char msg[1000]; + jbyte* srcBuf = (*env)->GetByteArrayElements(env, buf, 0); + snprintf(msg, sizeof(msg), "Pmem copy content. dest: %x, length: %x, src: %x ", address, length, srcBuf); + if (is_pmem) { + pmdkLoader->pmem_memcpy_nodrain(address, srcBuf, length); + } else { + memcpy(address, srcBuf, length); + } + (*env)->ReleaseByteArrayElements(env, buf, srcBuf, 0); + return; + #else + THROW(env, "java/lang/UnsupportedOperationException", + "The function pmemCopy is not supported."); + #endif + } + +/* + * Class: org_apache_hadoop_io_nativeio_NativeIO + * Method: pmemDrain + * Signature: ()V + */ +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemDrain( +JNIEnv *env, jclass thisClass) { + #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY) + pmdkLoader->pmem_drain(); + #else + THROW(env, "java/lang/UnsupportedOperationException", + "The function pmemDrain is not supported."); + #endif + } + + /* + * Class: org_apache_hadoop_io_nativeio_NativeIO_POSIX + * Method: pmemSync + * Signature: (JJ)V + */ +JNIEXPORT void JNICALL Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_pmemSync + (JNIEnv * env, jclass thisClass, jlong address, jlong length) { + + #if (defined UNIX) && (defined HADOOP_PMDK_LIBRARY) + int succeed = 0; + char msg[1000]; + succeed = pmdkLoader->pmem_msync(address, length); + // succeed = -1 failure + if (succeed = -1) { + snprintf(msg, sizeof(msg), "Failed to msync region. address: %x, length: %x, error msg: %s", address, length, pmem_errormsg()); + THROW(env, "java/io/IOException", msg); + return; + } + #else + THROW(env, "java/lang/UnsupportedOperationException", + "The function pmemSync is not supported."); + #endif + } + + +#ifdef __cplusplus +} +#endif diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c new file mode 100644 index 0000000..f7d6cfb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.c @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "org_apache_hadoop.h" +#include "pmdk_load.h" +#include "org_apache_hadoop_io_nativeio_NativeIO.h" +#include "org_apache_hadoop_io_nativeio_NativeIO_POSIX.h" + +#ifdef UNIX +#include <sys/time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <dlfcn.h> + +#include "config.h" +#endif + +PmdkLibLoader * pmdkLoader; + +/** + * pmdk_load.c + * Utility of loading the libpmem library and the required functions. + * Building of this codes won't rely on any libpmem source codes, but running + * into this will rely on successfully loading of the dynamic library. + * + */ + +static const char* load_functions() { +#ifdef UNIX + PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_map_file), "pmem_map_file"); + PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_unmap), "pmem_unmap"); + PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_is_pmem), "pmem_is_pmem"); + PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_drain), "pmem_drain"); + PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_memcpy_nodrain), "pmem_memcpy_nodrain"); + PMDK_LOAD_DYNAMIC_SYMBOL((pmdkLoader->pmem_msync), "pmem_msync"); +#endif + return NULL; +} + +void load_pmdk_lib(char* err, size_t err_len) { + const char* errMsg; + const char* library = NULL; +#ifdef UNIX + Dl_info dl_info; +#else + LPTSTR filename = NULL; +#endif + + err[0] = '\0'; + + if (pmdkLoader != NULL) { + return; + } + pmdkLoader = calloc(1, sizeof(PmdkLibLoader)); + + // Load PMDK library + #ifdef UNIX + pmdkLoader->libec = dlopen(HADOOP_PMDK_LIBRARY, RTLD_LAZY | RTLD_GLOBAL); + if (pmdkLoader->libec == NULL) { + snprintf(err, err_len, "Failed to load %s (%s)", + HADOOP_PMDK_LIBRARY, dlerror()); + return; + } + // Clear any existing error + dlerror(); + #endif + errMsg = load_functions(pmdkLoader->libec); + if (errMsg != NULL) { + snprintf(err, err_len, "Loading functions from PMDK failed: %s", errMsg); + } + +#ifdef UNIX + if(dladdr(pmdkLoader->pmem_map_file, &dl_info)) { + library = dl_info.dli_fname; + } +#else + if (GetModuleFileName(pmdkLoader->libec, filename, 256) > 0) { + library = filename; + } +#endif + + if (library == NULL) { + library = HADOOP_PMDK_LIBRARY; + } + + pmdkLoader->libname = strdup(library); +} diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h new file mode 100644 index 0000000..c93a076 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/pmdk_load.h @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "org_apache_hadoop.h" + +#ifdef UNIX +#include <sys/time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <dlfcn.h> +#endif + +#ifndef _PMDK_LOAD_H_ +#define _PMDK_LOAD_H_ + + +#ifdef UNIX +// For libpmem.h +typedef void * (*__d_pmem_map_file)(const char *path, size_t len, int flags, mode_t mode, + size_t *mapped_lenp, int *is_pmemp); +typedef int (* __d_pmem_unmap)(void *addr, size_t len); +typedef int (*__d_pmem_is_pmem)(const void *addr, size_t len); +typedef void (*__d_pmem_drain)(void); +typedef void * (*__d_pmem_memcpy_nodrain)(void *pmemdest, const void *src, size_t len); +typedef int (* __d_pmem_msync)(const void *addr, size_t len); + +#endif + +typedef struct __PmdkLibLoader { + // The loaded library handle + void* libec; + char* libname; + __d_pmem_map_file pmem_map_file; + __d_pmem_unmap pmem_unmap; + __d_pmem_is_pmem pmem_is_pmem; + __d_pmem_drain pmem_drain; + __d_pmem_memcpy_nodrain pmem_memcpy_nodrain; + __d_pmem_msync pmem_msync; +} PmdkLibLoader; + +extern PmdkLibLoader * pmdkLoader; + +/** + * A helper function to dlsym a 'symbol' from a given library-handle. + */ + +#ifdef UNIX + +static __attribute__ ((unused)) +void *myDlsym(void *handle, const char *symbol) { + void *func_ptr = dlsym(handle, symbol); + return func_ptr; +} + +/* A helper macro to dlsym the requisite dynamic symbol in NON-JNI env. */ +#define PMDK_LOAD_DYNAMIC_SYMBOL(func_ptr, symbol) \ + if ((func_ptr = myDlsym(pmdkLoader->libec, symbol)) == NULL) { \ + return "Failed to load symbol" symbol; \ + } + +#endif + +/** + * Return 0 if not support, 1 otherwise. + */ +int build_support_pmdk(); + +/** + * Initialize and load PMDK library, returning error message if any. + * + * @param err The err message buffer. + * @param err_len The length of the message buffer. + */ +void load_pmdk_lib(char* err, size_t err_len); + +#endif //_PMDK_LOAD_H_ \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java index 6b3c232..a14928c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java @@ -25,6 +25,8 @@ import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Paths; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; @@ -782,4 +784,155 @@ public class TestNativeIO { assertTrue("Native POSIX_FADV_NOREUSE const not set", POSIX_FADV_NOREUSE >= 0); } + + + @Test (timeout=10000) + public void testPmemCheckParameters() { + assumeNotWindows("Native PMDK not supported on Windows"); + // Skip testing while the build or environment does not support PMDK + assumeTrue(NativeIO.POSIX.isPmdkAvailable()); + + // Please make sure /mnt/pmem0 is a persistent memory device with total + // volume size 'volumeSize' + String filePath = "/$:"; + long length = 0; + long volumnSize = 16 * 1024 * 1024 * 1024L; + + // Incorrect file length + try { + NativeIO.POSIX.Pmem.mapBlock(filePath, length); + fail("Illegal length parameter should be detected"); + } catch (Exception e) { + LOG.info(e.getMessage()); + } + + // Incorrect file length + filePath = "/mnt/pmem0/test_native_io"; + length = -1L; + try { + NativeIO.POSIX.Pmem.mapBlock(filePath, length); + fail("Illegal length parameter should be detected"); + }catch (Exception e) { + LOG.info(e.getMessage()); + } + } + + @Test (timeout=10000) + public void testPmemMapMultipleFiles() { + assumeNotWindows("Native PMDK not supported on Windows"); + // Skip testing while the build or environment does not support PMDK + assumeTrue(NativeIO.POSIX.isPmdkAvailable()); + + // Please make sure /mnt/pmem0 is a persistent memory device with total + // volume size 'volumeSize' + String filePath = "/mnt/pmem0/test_native_io"; + long length = 0; + long volumnSize = 16 * 1024 * 1024 * 1024L; + + // Multiple files, each with 128MB size, aggregated size exceeds volume + // limit 16GB + length = 128 * 1024 * 1024L; + long fileNumber = volumnSize / length; + LOG.info("File number = " + fileNumber); + for (int i = 0; i < fileNumber; i++) { + String path = filePath + i; + LOG.info("File path = " + path); + NativeIO.POSIX.Pmem.mapBlock(path, length); + } + try { + NativeIO.POSIX.Pmem.mapBlock(filePath, length); + fail("Request map extra file when persistent memory is all occupied"); + } catch (Exception e) { + LOG.info(e.getMessage()); + } + } + + @Test (timeout=10000) + public void testPmemMapBigFile() { + assumeNotWindows("Native PMDK not supported on Windows"); + // Skip testing while the build or environment does not support PMDK + assumeTrue(NativeIO.POSIX.isPmdkAvailable()); + + // Please make sure /mnt/pmem0 is a persistent memory device with total + // volume size 'volumeSize' + String filePath = "/mnt/pmem0/test_native_io_big"; + long length = 0; + long volumeSize = 16 * 1024 * 1024 * 1024L; + + // One file length exceeds persistent memory volume 16GB. + length = volumeSize + 1024L; + try { + LOG.info("File length = " + length); + NativeIO.POSIX.Pmem.mapBlock(filePath, length); + fail("File length exceeds persistent memory total volume size"); + }catch (Exception e) { + LOG.info(e.getMessage()); + deletePmemMappedFile(filePath); + } + } + + @Test (timeout=10000) + public void testPmemCopy() throws IOException { + assumeNotWindows("Native PMDK not supported on Windows"); + // Skip testing while the build or environment does not support PMDK + assumeTrue(NativeIO.POSIX.isPmdkAvailable()); + + // Create and map a block file. Please make sure /mnt/pmem0 is a persistent + // memory device. + String filePath = "/mnt/pmem0/copy"; + long length = 4096; + PmemMappedRegion region = NativeIO.POSIX.Pmem.mapBlock(filePath, length); + assertTrue(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length)); + assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress(), length + 100)); + assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() + 100, length)); + assertFalse(NativeIO.POSIX.Pmem.isPmem(region.getAddress() - 100, length)); + + // Copy content to mapped file + byte[] data = generateSequentialBytes(0, (int) length); + NativeIO.POSIX.Pmem.memCopy(data, region.getAddress(), region.isPmem(), + length); + + // Read content before pmemSync + byte[] readBuf1 = new byte[(int)length]; + IOUtils.readFully(new FileInputStream(filePath), readBuf1, 0, (int)length); + assertArrayEquals(data, readBuf1); + + byte[] readBuf2 = new byte[(int)length]; + // Sync content to persistent memory twice + NativeIO.POSIX.Pmem.memSync(region); + NativeIO.POSIX.Pmem.memSync(region); + // Read content after pmemSync twice + IOUtils.readFully(new FileInputStream(filePath), readBuf2, 0, (int)length); + assertArrayEquals(data, readBuf2); + + //Read content after unmap twice + NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length); + NativeIO.POSIX.Pmem.unmapBlock(region.getAddress(), length); + byte[] readBuf3 = new byte[(int)length]; + IOUtils.readFully(new FileInputStream(filePath), readBuf3, 0, (int)length); + assertArrayEquals(data, readBuf3); + } + + private static byte[] generateSequentialBytes(int start, int length) { + byte[] result = new byte[length]; + + for (int i = 0; i < length; i++) { + result[i] = (byte) ((start + i) % 127); + } + return result; + } + + private static void deletePmemMappedFile(String filePath) { + try { + if (filePath != null) { + boolean result = Files.deleteIfExists(Paths.get(filePath)); + if (!result) { + throw new IOException(); + } + } + } catch (Throwable e) { + LOG.error("Failed to delete the mapped file " + filePath + + " from persistent memory", e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index 4fab214..37e548e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -215,6 +215,28 @@ public class FsDatasetCache { } /** + * Get cache address on persistent memory for read operation. + * The cache address comes from PMDK lib function when mapping + * block to persistent memory. + * + * @param bpid blockPoolId + * @param blockId blockId + * @return address + */ + long getCacheAddress(String bpid, long blockId) { + if (cacheLoader.isTransientCache() || + !isCached(bpid, blockId)) { + return -1; + } + if (!(cacheLoader.isNativeLoader())) { + return -1; + } + ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); + MappableBlock mappableBlock = mappableBlockMap.get(key).mappableBlock; + return mappableBlock.getAddress(); + } + + /** * @return List of cached blocks suitable for translation into a * {@link BlockListAsLongs} for a cache report. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 801b4c6..ee76f2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -815,6 +815,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { String cachePath = cacheManager.getReplicaCachePath( b.getBlockPoolId(), b.getBlockId()); if (cachePath != null) { + long addr = cacheManager.getCacheAddress( + b.getBlockPoolId(), b.getBlockId()); + if (addr != -1) { + LOG.debug("Get InputStream by cache address."); + return FsDatasetUtil.getDirectInputStream( + addr, info.getBlockDataLength()); + } + LOG.debug("Get InputStream by cache file path."); return FsDatasetUtil.getInputStreamAndSeek( new File(cachePath), seekOffset); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java index 92c0888..c2bc703 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java @@ -26,7 +26,10 @@ import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.URI; +import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.file.Files; import java.nio.file.Paths; @@ -43,6 +46,7 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; +import org.apache.htrace.shaded.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; /** Utility methods. */ @InterfaceAudience.Private @@ -132,6 +136,24 @@ public class FsDatasetUtil { } } + public static InputStream getDirectInputStream(long addr, long length) + throws IOException { + try { + Class<?> directByteBufferClass = + Class.forName("java.nio.DirectByteBuffer"); + Constructor<?> constructor = + directByteBufferClass.getDeclaredConstructor(long.class, int.class); + constructor.setAccessible(true); + ByteBuffer byteBuffer = + (ByteBuffer) constructor.newInstance(addr, (int)length); + return new ByteBufferBackedInputStream(byteBuffer); + } catch (ClassNotFoundException | NoSuchMethodException | + IllegalAccessException | InvocationTargetException | + InstantiationException e) { + throw new IOException(e); + } + } + /** * Find the meta-file for the specified block file and then return the * generation stamp from the name of the meta-file. Generally meta file will diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java index 0fff327..a00c442 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java @@ -35,4 +35,10 @@ public interface MappableBlock extends Closeable { * @return the number of bytes that have been cached. */ long getLength(); + + /** + * Get cache address if applicable. + * Return -1 if not applicable. + */ + long getAddress(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java index 3ec8416..5b9ba3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java @@ -64,8 +64,7 @@ public abstract class MappableBlockLoader { * @return The Mappable block. */ abstract MappableBlock load(long length, FileInputStream blockIn, - FileInputStream metaIn, String blockFileName, - ExtendedBlockId key) + FileInputStream metaIn, String blockFileName, ExtendedBlockId key) throws IOException; /** @@ -107,6 +106,11 @@ public abstract class MappableBlockLoader { abstract boolean isTransientCache(); /** + * Check whether this is a native pmem cache loader. + */ + abstract boolean isNativeLoader(); + + /** * Clean up cache, can be used during DataNode shutdown. */ void shutdown() { @@ -117,8 +121,7 @@ public abstract class MappableBlockLoader { * Verifies the block's checksum. This is an I/O intensive operation. */ protected void verifyChecksum(long length, FileInputStream metaIn, - FileChannel blockChannel, String blockFileName) - throws IOException { + FileChannel blockChannel, String blockFileName) throws IOException { // Verify the checksum from the block's meta file // Get the DataChecksum from the meta file header BlockMetadataHeader header = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java index 43b1b53..6569373 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoaderFactory.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.server.datanode.DNConf; +import org.apache.hadoop.io.nativeio.NativeIO; /** * Creates MappableBlockLoader. @@ -42,6 +43,9 @@ public final class MappableBlockLoaderFactory { if (conf.getPmemVolumes() == null || conf.getPmemVolumes().length == 0) { return new MemoryMappableBlockLoader(); } + if (NativeIO.isAvailable() && NativeIO.POSIX.isPmdkAvailable()) { + return new NativePmemMappableBlockLoader(); + } return new PmemMappableBlockLoader(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java index 52d8d93..dd4188c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappableBlockLoader.java @@ -66,8 +66,7 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader { */ @Override MappableBlock load(long length, FileInputStream blockIn, - FileInputStream metaIn, String blockFileName, - ExtendedBlockId key) + FileInputStream metaIn, String blockFileName, ExtendedBlockId key) throws IOException { MemoryMappedBlock mappableBlock = null; MappedByteBuffer mmap = null; @@ -116,4 +115,9 @@ public class MemoryMappableBlockLoader extends MappableBlockLoader { public boolean isTransientCache() { return true; } + + @Override + public boolean isNativeLoader() { + return false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java index c09ad1a..47dfeae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MemoryMappedBlock.java @@ -45,6 +45,11 @@ public class MemoryMappedBlock implements MappableBlock { } @Override + public long getAddress() { + return -1L; + } + + @Override public void close() { if (mmap != null) { NativeIO.POSIX.munmap(mmap); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java similarity index 53% copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java index 3ec8416..09e9454 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappableBlockLoader.java @@ -24,7 +24,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIO.POSIX; import org.apache.hadoop.util.DataChecksum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; import java.io.DataInputStream; @@ -34,21 +38,26 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; /** - * Maps block to DataNode cache region. + * Map block to persistent memory with native PMDK libs. */ @InterfaceAudience.Private @InterfaceStability.Unstable -public abstract class MappableBlockLoader { +public class NativePmemMappableBlockLoader extends PmemMappableBlockLoader { + private static final Logger LOG = + LoggerFactory.getLogger(NativePmemMappableBlockLoader.class); - /** - * Initialize a specific MappableBlockLoader. - */ - abstract void initialize(FsDatasetCache cacheManager) throws IOException; + @Override + void initialize(FsDatasetCache cacheManager) throws IOException { + super.initialize(cacheManager); + } /** * Load the block. * - * Map the block, and then verify its checksum. + * Map the block and verify its checksum. + * + * The block will be mapped to PmemDir/BlockPoolId-BlockId, in which PmemDir + * is a persistent memory volume chosen by PmemVolumeManager. * * @param length The current length of the block. * @param blockIn The block input stream. Should be positioned at the @@ -58,67 +67,62 @@ public abstract class MappableBlockLoader { * @param blockFileName The block file name, for logging purposes. * @param key The extended block ID. * - * @throws IOException If mapping block to cache region fails or checksum - * fails. + * @throws IOException If mapping block to persistent memory fails or + * checksum fails. * * @return The Mappable block. */ - abstract MappableBlock load(long length, FileInputStream blockIn, - FileInputStream metaIn, String blockFileName, - ExtendedBlockId key) - throws IOException; - - /** - * Try to reserve some given bytes. - * - * @param key The ExtendedBlockId for a block. - * - * @param bytesCount The number of bytes to add. - * - * @return The new number of usedBytes if we succeeded; - * -1 if we failed. - */ - abstract long reserve(ExtendedBlockId key, long bytesCount); - - /** - * Release some bytes that we're using. - * - * @param key The ExtendedBlockId for a block. - * - * @param bytesCount The number of bytes to release. - * - * @return The new number of usedBytes. - */ - abstract long release(ExtendedBlockId key, long bytesCount); - - /** - * Get the approximate amount of cache space used. - */ - abstract long getCacheUsed(); - - /** - * Get the maximum amount of cache bytes. - */ - abstract long getCacheCapacity(); + @Override + public MappableBlock load(long length, FileInputStream blockIn, + FileInputStream metaIn, String blockFileName, + ExtendedBlockId key) + throws IOException { + NativePmemMappedBlock mappableBlock = null; + POSIX.PmemMappedRegion region = null; + String filePath = null; - /** - * Check whether the cache is non-volatile. - */ - abstract boolean isTransientCache(); + FileChannel blockChannel = null; + try { + blockChannel = blockIn.getChannel(); + if (blockChannel == null) { + throw new IOException("Block InputStream has no FileChannel."); + } - /** - * Clean up cache, can be used during DataNode shutdown. - */ - void shutdown() { - // Do nothing. + assert NativeIO.isAvailable(); + filePath = PmemVolumeManager.getInstance().getCachePath(key); + region = POSIX.Pmem.mapBlock(filePath, length); + if (region == null) { + throw new IOException("Failed to map the block " + blockFileName + + " to persistent storage."); + } + verifyChecksumAndMapBlock(region, length, metaIn, blockChannel, + blockFileName); + mappableBlock = new NativePmemMappedBlock(region.getAddress(), + region.getLength(), key); + LOG.info("Successfully cached one replica:{} into persistent memory" + + ", [cached path={}, address={}, length={}]", key, filePath, + region.getAddress(), length); + } finally { + IOUtils.closeQuietly(blockChannel); + if (mappableBlock == null) { + if (region != null) { + // unmap content from persistent memory + POSIX.Pmem.unmapBlock(region.getAddress(), + region.getLength()); + FsDatasetUtil.deleteMappedFile(filePath); + } + } + } + return mappableBlock; } /** - * Verifies the block's checksum. This is an I/O intensive operation. + * Verifies the block's checksum meanwhile map block to persistent memory. + * This is an I/O intensive operation. */ - protected void verifyChecksum(long length, FileInputStream metaIn, - FileChannel blockChannel, String blockFileName) - throws IOException { + private void verifyChecksumAndMapBlock(POSIX.PmemMappedRegion region, + long length, FileInputStream metaIn, FileChannel blockChannel, + String blockFileName) throws IOException { // Verify the checksum from the block's meta file // Get the DataChecksum from the meta file header BlockMetadataHeader header = @@ -129,8 +133,8 @@ public abstract class MappableBlockLoader { try { metaChannel = metaIn.getChannel(); if (metaChannel == null) { - throw new IOException( - "Block InputStream meta file has no FileChannel."); + throw new IOException("Cannot get FileChannel" + + " from Block InputStream meta file."); } DataChecksum checksum = header.getChecksum(); final int bytesPerChecksum = checksum.getBytesPerChecksum(); @@ -140,13 +144,19 @@ public abstract class MappableBlockLoader { ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks * checksumSize); // Verify the checksum int bytesVerified = 0; + long mappedAddress = -1L; + if (region != null) { + mappedAddress = region.getAddress(); + } while (bytesVerified < length) { Preconditions.checkState(bytesVerified % bytesPerChecksum == 0, - "Unexpected partial chunk before EOF"); + "Unexpected partial chunk before EOF."); assert bytesVerified % bytesPerChecksum == 0; int bytesRead = fillBuffer(blockChannel, blockBuf); if (bytesRead == -1) { - throw new IOException("checksum verification failed: premature EOF"); + throw new IOException( + "Checksum verification failed for the block " + blockFileName + + ": premature EOF"); } blockBuf.flip(); // Number of read chunks, including partial chunk at end @@ -158,32 +168,24 @@ public abstract class MappableBlockLoader { bytesVerified); // Success bytesVerified += bytesRead; + // Copy data to persistent file + POSIX.Pmem.memCopy(blockBuf.array(), mappedAddress, + region.isPmem(), bytesRead); + mappedAddress += bytesRead; + // Clear buffer blockBuf.clear(); checksumBuf.clear(); } + if (region != null) { + POSIX.Pmem.memSync(region); + } } finally { IOUtils.closeQuietly(metaChannel); } } - /** - * Reads bytes into a buffer until EOF or the buffer's limit is reached. - */ - protected int fillBuffer(FileChannel channel, ByteBuffer buf) - throws IOException { - int bytesRead = channel.read(buf); - if (bytesRead < 0) { - //EOF - return bytesRead; - } - while (buf.remaining() > 0) { - int n = channel.read(buf); - if (n < 0) { - //EOF - return bytesRead; - } - bytesRead += n; - } - return bytesRead; + @Override + public boolean isNativeLoader() { + return true; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java similarity index 52% copy from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java copy to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java index 25c3d40..92012b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/NativePmemMappedBlock.java @@ -21,25 +21,29 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.io.nativeio.NativeIO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** - * Represents an HDFS block that is mapped to persistent memory by DataNode - * with mapped byte buffer. PMDK is NOT involved in this implementation. + * Represents an HDFS block that is mapped to persistent memory by the DataNode. */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class PmemMappedBlock implements MappableBlock { +public class NativePmemMappedBlock implements MappableBlock { private static final Logger LOG = - LoggerFactory.getLogger(PmemMappedBlock.class); + LoggerFactory.getLogger(NativePmemMappedBlock.class); + + private long pmemMappedAddress = -1L; private long length; private ExtendedBlockId key; - PmemMappedBlock(long length, ExtendedBlockId key) { + NativePmemMappedBlock(long pmemMappedAddress, long length, + ExtendedBlockId key) { assert length > 0; + this.pmemMappedAddress = pmemMappedAddress; this.length = length; this.key = key; } @@ -50,15 +54,32 @@ public class PmemMappedBlock implements MappableBlock { } @Override + public long getAddress() { + return pmemMappedAddress; + } + + @Override public void close() { - String cacheFilePath = - PmemVolumeManager.getInstance().getCachePath(key); - try { - FsDatasetUtil.deleteMappedFile(cacheFilePath); - LOG.info("Successfully uncached one replica:{} from persistent memory" - + ", [cached path={}, length={}]", key, cacheFilePath, length); - } catch (IOException e) { - LOG.warn("Failed to delete the mapped File: {}!", cacheFilePath, e); + if (pmemMappedAddress != -1L) { + String cacheFilePath = + PmemVolumeManager.getInstance().getCachePath(key); + try { + // Current libpmem will report error when pmem_unmap is called with + // length not aligned with page size, although the length is returned + // by pmem_map_file. + boolean success = + NativeIO.POSIX.Pmem.unmapBlock(pmemMappedAddress, length); + if (!success) { + throw new IOException("Failed to unmap the mapped file from " + + "pmem address: " + pmemMappedAddress); + } + pmemMappedAddress = -1L; + FsDatasetUtil.deleteMappedFile(cacheFilePath); + LOG.info("Successfully uncached one replica:{} from persistent memory" + + ", [cached path={}, length={}]", key, cacheFilePath, length); + } catch (IOException e) { + LOG.warn("IOException occurred for block {}!", key, e); + } } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java index 239fff8..70a42c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappableBlockLoader.java @@ -43,7 +43,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader { @Override void initialize(FsDatasetCache cacheManager) throws IOException { - LOG.info("Initializing cache loader: PmemMappableBlockLoader."); + LOG.info("Initializing cache loader: " + this.getClass().getName()); DNConf dnConf = cacheManager.getDnConf(); PmemVolumeManager.init(dnConf.getPmemVolumes()); pmemVolumeManager = PmemVolumeManager.getInstance(); @@ -71,8 +71,7 @@ public class PmemMappableBlockLoader extends MappableBlockLoader { */ @Override MappableBlock load(long length, FileInputStream blockIn, - FileInputStream metaIn, String blockFileName, - ExtendedBlockId key) + FileInputStream metaIn, String blockFileName, ExtendedBlockId key) throws IOException { PmemMappedBlock mappableBlock = null; String cachePath = null; @@ -133,6 +132,11 @@ public class PmemMappableBlockLoader extends MappableBlockLoader { } @Override + public boolean isNativeLoader() { + return false; + } + + @Override void shutdown() { LOG.info("Clean up cache on persistent memory during shutdown."); PmemVolumeManager.getInstance().cleanup(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java index 25c3d40..a49626a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/PmemMappedBlock.java @@ -50,6 +50,11 @@ public class PmemMappedBlock implements MappableBlock { } @Override + public long getAddress() { + return -1L; + } + + @Override public void close() { String cacheFilePath = PmemVolumeManager.getInstance().getCachePath(key); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org