Repository: trafodion Updated Branches: refs/heads/master 072708f1b -> a6512c05c
[TRAFODION-3110] Refactor LOB access to use the new implementation of HdfsClient This feature is enabled by default. To disable, set a variable USE_LIBHDFS=1 in $TRAF_HOME/etc/ms.env and restart the trafodion cluster. This feature includes the following: 1. Uses single FSDataInputStream for each LOB column in a query as opposed to the opening the hdfs file for every row. 2. Uses FSDataOutputStream to write the lob data but closes it immediately to allow concurrent writes to the hdfs file. HDFS supports a single writer at a time. Need to conform if multiple writes can be done without the need for RMS lock feature. 3. Improved error messaging that displays the java exception stack to the end user. 4. LOB worker threads are no longer created Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/52f074af Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/52f074af Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/52f074af Branch: refs/heads/master Commit: 52f074af0e866fe82ea4f37eaf90cdc880871851 Parents: 5b5ab32 Author: selvaganesang <selva.govindara...@esgyn.com> Authored: Fri Jun 15 22:53:04 2018 +0000 Committer: selvaganesang <selva.govindara...@esgyn.com> Committed: Fri Jun 15 23:15:13 2018 +0000 ---------------------------------------------------------------------- core/sql/bin/SqlciErrors.txt | 2 +- core/sql/cli/Cli.cpp | 16 +- core/sql/executor/ExExeUtilGet.cpp | 46 +-- core/sql/executor/ExExeUtilLoad.cpp | 26 +- core/sql/executor/HdfsClient_JNI.cpp | 206 +++++++++- core/sql/executor/HdfsClient_JNI.h | 20 +- core/sql/executor/ex_globals.cpp | 2 +- core/sql/exp/ExpLOB.cpp | 12 +- core/sql/exp/ExpLOBaccess.cpp | 390 ++++++++++++------- core/sql/exp/ExpLOBaccess.h | 12 + core/sql/exp/ExpLOBinterface.cpp | 14 +- core/sql/regress/executor/EXPECTED130 | 4 +- .../main/java/org/trafodion/sql/HDFSClient.java | 113 +++++- 13 files changed, 607 insertions(+), 256 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/bin/SqlciErrors.txt ---------------------------------------------------------------------- diff --git a/core/sql/bin/SqlciErrors.txt b/core/sql/bin/SqlciErrors.txt index ee51272..4e644e8 100644 --- a/core/sql/bin/SqlciErrors.txt +++ b/core/sql/bin/SqlciErrors.txt @@ -1578,7 +1578,7 @@ $1~String1 -------------------------------- 8437 ZZZZZ 99999 BEGINNER MAJOR DBADMIN Mismatch detected between external table and underlying hive table definitions. 8440 ZZZZZ 99999 BEGINNER MAJOR DBADMIN The size of the history buffer is too small to execute one or more of the OLAP Windowed Functions in the query. 8441 ZZZZZ 99999 BEGINNER MAJOR DBADMIN one or more of the OLAP Windowed Functions in the query may require overflow which is not supported yet. -8442 ZZZZZ 99999 BEGINNER MAJOR DBADMIN Unable to access $0~string0 interface. Call to $1~string1 returned error $2~string2($0~int0). Error detail $1~int1. +8442 ZZZZZ 99999 BEGINNER MAJOR DBADMIN Unable to access $0~string0 interface. Call to $0~string0 returned error $1~string1($0~int0). Error detail: $1~int1. Cause: $2~string2. 8443 ZZZZZ 99999 BEGINNER MAJOR DBADMIN Provided LOB handle is invalid. 8444 ZZZZZ 99999 BEGINNER MAJOR DBADMIN Only one lob handle can be returned by child for data extract. 8445 ZZZZZ 99999 BEGINNER MAJOR DBADMIN An error occurred during transformation of hdfs row to sql row. Error Detail: $0~string0 http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/cli/Cli.cpp ---------------------------------------------------------------------- diff --git a/core/sql/cli/Cli.cpp b/core/sql/cli/Cli.cpp index a942bca..d641f6d 100644 --- a/core/sql/cli/Cli.cpp +++ b/core/sql/cli/Cli.cpp @@ -9397,7 +9397,7 @@ Lng32 SQLCLI_LOB_GC_Interface ExRaiseSqlError(currContext.exHeap(), &da, (ExeErrorCode)(8442), NULL, &cliRC , &rc, NULL, (char*)"Lob GC call", - getLobErrStr(rc)); + getLobErrStr(rc), (char*)getSqlJniErrorStr()); // TBD When local transaction support is in // rollback all the updates to the lob desc chunks file too. // return with warning @@ -9533,8 +9533,7 @@ Lng32 SQLCLI_LOBddlInterface ExRaiseSqlError(currContext.exHeap(), &da, (ExeErrorCode)(8442), NULL, &cliRC , &rc, NULL, (char*)"ExpLOBInterfaceCreate", - - getLobErrStr(rc)); + getLobErrStr(rc), (char*)getSqlJniErrorStr()); goto non_cli_error_return; } @@ -9553,8 +9552,7 @@ Lng32 SQLCLI_LOBddlInterface ExRaiseSqlError(currContext.exHeap(), &da, (ExeErrorCode)(8442), NULL, &cliRC , &rc, NULL, (char*)"ExpLOBInterfaceCreate", - - getLobErrStr(rc)); + getLobErrStr(rc), (char*)getSqlJniErrorStr()); goto non_cli_error_return; } @@ -9650,7 +9648,7 @@ Lng32 SQLCLI_LOBddlInterface ExRaiseSqlError(currContext.exHeap(), &da, (ExeErrorCode)(8442), NULL, &cliRC , &rc, NULL, (char*)"ExpLOBInterfaceCreate", - getLobErrStr(rc)); + getLobErrStr(rc), (char*)getSqlJniErrorStr()); goto non_cli_error_return; } @@ -9668,7 +9666,7 @@ Lng32 SQLCLI_LOBddlInterface ExRaiseSqlError(currContext.exHeap(), &da, (ExeErrorCode)(8442), NULL, &cliRC , &rc, NULL, (char*)"ExpLOBInterfaceDrop ", - getLobErrStr(rc)); + getLobErrStr(rc), (char*)getSqlJniErrorStr()); goto non_cli_error_return; } }//for @@ -9700,7 +9698,7 @@ Lng32 SQLCLI_LOBddlInterface ExRaiseSqlError(currContext.exHeap(), &da, (ExeErrorCode)(8442), NULL, &cliRC , &rc, NULL, (char*)"ExpLOBInterfaceCreate", - getLobErrStr(rc)); + getLobErrStr(rc), (char*)getSqlJniErrorStr()); goto non_cli_error_return; } // drop descriptor table @@ -9718,7 +9716,7 @@ Lng32 SQLCLI_LOBddlInterface ExRaiseSqlError(currContext.exHeap(), &da, (ExeErrorCode)(8442), NULL, &cliRC , &rc, NULL, (char*)"ExpLOBInterfaceDrop ", - getLobErrStr(rc)); + getLobErrStr(rc), (char*)getSqlJniErrorStr()); goto non_cli_error_return; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/executor/ExExeUtilGet.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExExeUtilGet.cpp b/core/sql/executor/ExExeUtilGet.cpp index 163b190..f4d0ade 100644 --- a/core/sql/executor/ExExeUtilGet.cpp +++ b/core/sql/executor/ExExeUtilGet.cpp @@ -7733,26 +7733,12 @@ else return rc; //EOD of LOB data file - - hdfsFS fs = currContext->getHdfsServerConnection((char*)getLItdb().getHdfsServer(),getLItdb().getHdfsPort()); - if (fs == NULL) - return LOB_DATA_FILE_OPEN_ERROR; - - snprintf(lobDataFilePath, LOBINFO_MAX_FILE_LEN, "%s/%s", lobLocation, lobDataFile); - hdfsFile fdData = hdfsOpenFile(fs, lobDataFilePath,O_RDONLY,0,0,0); - if (!fdData) - { - hdfsCloseFile(fs,fdData); - fdData = NULL; - return LOB_DATA_FILE_OPEN_ERROR; - } - hdfsFileInfo *fInfo = hdfsGetPathInfo(fs, lobDataFilePath); - if (fInfo) - lobEOD = fInfo->mSize; - else - lobEOD = 0; - + HDFS_Client_RetCode hdfsClientRetcode; + lobEOD = HdfsClient::hdfsSize(lobDataFilePath, hdfsClientRetcode); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_DATA_FILE_OPEN_ERROR; + str_sprintf(buf, " LOB EOD : %ld", lobEOD); if (moveRowToUpQueue(buf, strlen(buf), &rc)) return rc; @@ -8034,24 +8020,12 @@ short ExExeUtilLobInfoTableTcb::collectLobInfo(char * tableName,Int32 currLobNum str_cpy_all(lobInfo_->lobDataFile, lobDataFile,strlen(lobDataFile)); } //EOD of LOB data file - // hdfsFS fs = hdfsConnect(getLItdb().getHdfsServer(),getLItdb().getHdfsPort()); - hdfsFS fs = currContext->getHdfsServerConnection((char*)getLItdb().getHdfsServer(),getLItdb().getHdfsPort()); - if (fs == NULL) - return LOB_DATA_FILE_OPEN_ERROR; - snprintf(lobDataFilePath, LOBINFO_MAX_FILE_LEN, "%s/%s", lobLocation, lobDataFile); - hdfsFile fdData = hdfsOpenFile(fs, lobDataFilePath,O_RDONLY,0,0,0); - if (!fdData) - { - hdfsCloseFile(fs,fdData); - fdData = NULL; - return LOB_DATA_FILE_OPEN_ERROR; - } - hdfsFileInfo *fInfo = hdfsGetPathInfo(fs, lobDataFilePath); - if (fInfo) - lobEOD = fInfo->mSize; - else - lobEOD = 0; + HDFS_Client_RetCode hdfsClientRetcode; + lobEOD = HdfsClient::hdfsSize(lobDataFilePath, hdfsClientRetcode); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_DATA_FILE_OPEN_ERROR; + lobInfo_->lobDataFileSizeEod=lobEOD; // Sum of all the lobDescChunks for used space http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/executor/ExExeUtilLoad.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExExeUtilLoad.cpp b/core/sql/executor/ExExeUtilLoad.cpp index 160a6bb..235293c 100644 --- a/core/sql/executor/ExExeUtilLoad.cpp +++ b/core/sql/executor/ExExeUtilLoad.cpp @@ -3052,7 +3052,7 @@ short ExExeUtilLobExtractTcb::work() ExRaiseSqlError(getHeap(), &diagsArea_, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceSelect", - getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); step_ = HANDLE_ERROR_; break; } @@ -3098,7 +3098,7 @@ short ExExeUtilLobExtractTcb::work() ExRaiseSqlError(getHeap(), &diagsArea_, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceSelectCursor", - getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); step_ = HANDLE_ERROR_; break; } @@ -3151,7 +3151,7 @@ short ExExeUtilLobExtractTcb::work() ExRaiseSqlError(getHeap(), &diagsArea_, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceSelectCursor", - getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); step_ = HANDLE_ERROR_; break; } @@ -3219,7 +3219,7 @@ short ExExeUtilLobExtractTcb::work() ExRaiseSqlError(getHeap(), &diagsArea_, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceSelectCursor", - getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); step_ = HANDLE_ERROR_; break; } @@ -3507,7 +3507,7 @@ short ExExeUtilLobUpdateTcb::work() ExRaiseSqlError(getHeap(), &diagsArea_, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceUpdate", - getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); step_ = HANDLE_ERROR_; break; } @@ -3590,7 +3590,7 @@ short ExExeUtilLobUpdateTcb::work() ExRaiseSqlError(getHeap(), &diagsArea_, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceUpdate", - getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); step_ = HANDLE_ERROR_; break; } @@ -3675,7 +3675,7 @@ short ExExeUtilLobUpdateTcb::work() ExRaiseSqlError(getHeap(), &diagsArea_, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceUpdate", - getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); step_ = HANDLE_ERROR_; break; } @@ -3848,7 +3848,7 @@ short ExExeUtilFileExtractTcb::work() ExRaiseSqlError(getHeap(), &diagsArea_, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceSelectCursor/open", - getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); step_ = HANDLE_ERROR_; break; } @@ -3894,7 +3894,7 @@ short ExExeUtilFileExtractTcb::work() ExRaiseSqlError(getHeap(), &diagsArea_, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceSelectCursor/read", - getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); step_ = HANDLE_ERROR_; break; } @@ -3949,7 +3949,7 @@ short ExExeUtilFileExtractTcb::work() ExRaiseSqlError(getHeap(), &diagsArea_, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceSelectCursor/close", - getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); step_ = HANDLE_ERROR_; break; } @@ -4084,7 +4084,7 @@ short ExExeUtilFileLoadTcb::work() ExRaiseSqlError(getHeap(), &diagsArea_, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceCreate", - getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); step_ = HANDLE_ERROR_; break; } @@ -4199,7 +4199,7 @@ short ExExeUtilFileLoadTcb::work() ExRaiseSqlError(getHeap(), &diagsArea_, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceInsert", - getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); step_ = HANDLE_ERROR_; break; } @@ -4229,7 +4229,7 @@ short ExExeUtilFileLoadTcb::work() ExRaiseSqlError(getHeap(), &diagsArea_, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceCloseFile", - getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); step_ = HANDLE_ERROR_; break; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/executor/HdfsClient_JNI.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp index 72157bf..930b7f4 100644 --- a/core/sql/executor/HdfsClient_JNI.cpp +++ b/core/sql/executor/HdfsClient_JNI.cpp @@ -332,6 +332,8 @@ static const char* const hdfsClientErrorEnumStr[] = ,"Java exception in HdfsClient::hdfsOpen()." ,"JNI NewStringUTF() in HdfsClient::hdfsWrite()." ,"Java exception in HdfsClient::hdfsWrite()." + ,"JNI NewStringUTF() in HdfsClient::hdfsWriteImmediate()." + ,"Java exception in HdfsClient::hdfsWriteImmediate()." ,"Error in HdfsClient::hdfsRead()." ,"Java exception in HdfsClient::hdfsRead()." ,"Java exception in HdfsClient::hdfsClose()." @@ -353,6 +355,10 @@ static const char* const hdfsClientErrorEnumStr[] = ,"Buffer is small in HdfsClient::getFsDefaultName()." ,"Error in HdfsClient::hdfsCreateDirectory()." ,"Java exception in HdfsClient::hdfsCreateDirectory()." + ,"Error in HdfsClient::hdfsRename()." + ,"Java exception in HdfsClient::hdfsRename()." + ,"Error in HdfsClient::hdfsSize()." + ,"Java exception in HdfsClient::hdfsSize()." }; ////////////////////////////////////////////////////////////////////////////// @@ -416,15 +422,10 @@ HdfsClient* HdfsClient::getInstance() return hdfsClient; } -void HdfsClient::deleteInstance() +void HdfsClient::deleteInstance(HdfsClient *hdfsClient) { - ContextCli *currContext = GetCliGlobals()->currContext(); - HdfsClient *hdfsClient = currContext->getHDFSClient(); - if (hdfsClient != NULL) { - NAHeap *heap = currContext->exHeap(); - NADELETE(hdfsClient, HdfsClient, heap); - currContext->setHDFSClient(NULL); - } + hdfsClient->hdfsClose(); + NADELETE(hdfsClient, HdfsClient, hdfsClient->getHeap()); } HDFS_Client_RetCode HdfsClient::init() @@ -452,8 +453,10 @@ HDFS_Client_RetCode HdfsClient::init() JavaMethods_[JM_HDFS_OPEN ].jm_signature = "(Ljava/lang/String;Z)Z"; JavaMethods_[JM_HDFS_WRITE ].jm_name = "hdfsWrite"; JavaMethods_[JM_HDFS_WRITE ].jm_signature = "([B)I"; + JavaMethods_[JM_HDFS_WRITE_IMMEDIATE].jm_name = "hdfsWriteImmediate"; + JavaMethods_[JM_HDFS_WRITE_IMMEDIATE].jm_signature = "([B)J"; JavaMethods_[JM_HDFS_READ ].jm_name = "hdfsRead"; - JavaMethods_[JM_HDFS_READ ].jm_signature = "(Ljava/nio/ByteBuffer;)I"; + JavaMethods_[JM_HDFS_READ ].jm_signature = "(JLjava/nio/ByteBuffer;)I"; JavaMethods_[JM_HDFS_CLOSE ].jm_name = "hdfsClose"; JavaMethods_[JM_HDFS_CLOSE ].jm_signature = "()Z"; JavaMethods_[JM_HDFS_MERGE_FILES].jm_name = "hdfsMergeFiles"; @@ -472,6 +475,12 @@ HDFS_Client_RetCode HdfsClient::init() JavaMethods_[JM_GET_FS_DEFAULT_NAME].jm_signature = "()Ljava/lang/String;"; JavaMethods_[JM_HDFS_CREATE_DIRECTORY].jm_name = "hdfsCreateDirectory"; JavaMethods_[JM_HDFS_CREATE_DIRECTORY].jm_signature = "(Ljava/lang/String;)Z"; + JavaMethods_[JM_HDFS_RENAME].jm_name = "hdfsRename"; + JavaMethods_[JM_HDFS_RENAME].jm_signature = "(Ljava/lang/String;Ljava/lang/String;)Z"; + JavaMethods_[JM_HDFS_SIZE].jm_name = "hdfsSize"; + JavaMethods_[JM_HDFS_SIZE].jm_signature = "()J"; + JavaMethods_[JM_HDFS_SIZE_FOR_FILE].jm_name = "hdfsSize"; + JavaMethods_[JM_HDFS_SIZE_FOR_FILE].jm_signature = "(Ljava/lang/String;)J"; rc = (HDFS_Client_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_); if (rc == HDFS_CLIENT_OK) javaMethodsInitialized_ = TRUE; @@ -551,7 +560,7 @@ HDFS_Client_RetCode HdfsClient::hdfsOpen(const char* path, NABoolean compress) if (initJNIEnv() != JOI_OK) return HDFS_CLIENT_ERROR_HDFS_OPEN_PARAM; - + setPath(path); jstring js_path = jenv_->NewStringUTF(path); if (js_path == NULL) { GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_OPEN_PARAM)); @@ -587,8 +596,37 @@ HDFS_Client_RetCode HdfsClient::hdfsOpen(const char* path, NABoolean compress) return HDFS_CLIENT_OK; } +Int64 HdfsClient::hdfsSize(HDFS_Client_RetCode &hdfsClientRetcode) +{ + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsSize() called."); + + if (initJNIEnv() != JOI_OK) { + hdfsClientRetcode = HDFS_CLIENT_ERROR_SIZE_PARAM; + return -1; + } -Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode &hdfsClientRetcode) + if (hdfsStats_ != NULL) + hdfsStats_->getHdfsTimer().start(); + tsRecentJMFromJNI = JavaMethods_[JM_HDFS_SIZE].jm_full_name; + jlong jresult = jenv_->CallLongMethod(javaObj_, JavaMethods_[JM_HDFS_SIZE].methodID); + if (hdfsStats_ != NULL) { + hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); + hdfsStats_->incHdfsCalls(); + } + + if (jenv_->ExceptionCheck()) + { + getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsSize()"); + jenv_->PopLocalFrame(NULL); + hdfsClientRetcode = HDFS_CLIENT_ERROR_SIZE_EXCEPTION; + return -1; + } + hdfsClientRetcode = HDFS_CLIENT_OK; + return jresult; +} + + +Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode &hdfsClientRetcode, int maxChunkSize) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsWrite(%ld) called.", len); @@ -598,8 +636,9 @@ Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode &hd } Int64 lenRemain = len; Int64 writeLen; - Int64 chunkLen = (ioByteArraySizeInKB_ > 0 ? ioByteArraySizeInKB_ * 1024 : 0); + Int64 chunkLen = (maxChunkSize > 0 ? maxChunkSize : (ioByteArraySizeInKB_ > 0 ? ioByteArraySizeInKB_ * 1024 : 0)); Int64 offset = 0; + jint bytesWritten; do { if ((chunkLen > 0) && (lenRemain > chunkLen)) @@ -621,7 +660,7 @@ Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode &hd tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE].jm_full_name; // Java method returns the cumulative bytes written - jint totalBytesWritten = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE].methodID, jbArray); + bytesWritten = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE].methodID, jbArray); if (hdfsStats_ != NULL) { hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); @@ -639,12 +678,70 @@ Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode &hd } while (lenRemain > 0); jenv_->PopLocalFrame(NULL); hdfsClientRetcode = HDFS_CLIENT_OK; - return len; + return bytesWritten; } -Int32 HdfsClient::hdfsRead(const char* data, Int64 len, HDFS_Client_RetCode &hdfsClientRetcode) +Int64 HdfsClient::hdfsWriteImmediate(const char* data, Int64 len, HDFS_Client_RetCode &hdfsClientRetcode, int maxChunkSize) { - QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsWrite(%ld) called.", len); + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsWriteImmediate(%ld) called.", len); + + if (initJNIEnv() != JOI_OK) { + hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_EXCEPTION; + return 0; + } + Int64 lenRemain = len; + Int64 writeLen; + Int64 chunkLen = (maxChunkSize > 0 ? maxChunkSize : (ioByteArraySizeInKB_ > 0 ? ioByteArraySizeInKB_ * 1024 : 0)); + Int64 offset = 0; + jlong writeOffset = -1; + jlong chunkWriteOffset; + do + { + if ((chunkLen > 0) && (lenRemain > chunkLen)) + writeLen = chunkLen; + else + writeLen = lenRemain; + //Write the requisite bytes into the file + jbyteArray jbArray = jenv_->NewByteArray(writeLen); + if (!jbArray) { + GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_PARAM)); + jenv_->PopLocalFrame(NULL); + hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_PARAM; + return 0; + } + jenv_->SetByteArrayRegion(jbArray, 0, writeLen, (const jbyte*)(data+offset)); + + if (hdfsStats_ != NULL) + hdfsStats_->getHdfsTimer().start(); + + tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE_IMMEDIATE].jm_full_name; + // Java method returns the cumulative bytes written + chunkWriteOffset = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE_IMMEDIATE].methodID, jbArray); + if (writeOffset == -1) + writeOffset = chunkWriteOffset; + + if (hdfsStats_ != NULL) { + hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); + hdfsStats_->incHdfsCalls(); + } + if (jenv_->ExceptionCheck()) + { + getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsWrite()"); + jenv_->PopLocalFrame(NULL); + hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_EXCEPTION; + return 0; + } + lenRemain -= writeLen; + offset += writeLen; + } while (lenRemain > 0); + jenv_->PopLocalFrame(NULL); + hdfsClientRetcode = HDFS_CLIENT_OK; + return writeOffset; +} + +Int32 HdfsClient::hdfsRead(Int64 pos, const char* data, Int64 len, HDFS_Client_RetCode &hdfsClientRetcode) +{ + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsRead(%ld) called.", len); if (initJNIEnv() != JOI_OK) { hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_READ_EXCEPTION; @@ -661,7 +758,8 @@ Int32 HdfsClient::hdfsRead(const char* data, Int64 len, HDFS_Client_RetCode &hdf tsRecentJMFromJNI = JavaMethods_[JM_HDFS_READ].jm_full_name; jint bytesRead = 0; - bytesRead = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_READ].methodID, j_buf); + jlong j_pos = pos; + bytesRead = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_READ].methodID, j_pos, j_buf); if (hdfsStats_ != NULL) { hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop()); @@ -714,6 +812,38 @@ HDFS_Client_RetCode HdfsClient::hdfsClose() return HDFS_CLIENT_OK; } +Int64 HdfsClient::hdfsSize(const char *filename, HDFS_Client_RetCode &hdfsClientRetcode) +{ + QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsSize() called."); + + if (initJNIEnv() != JOI_OK) { + hdfsClientRetcode = HDFS_CLIENT_ERROR_SIZE_PARAM; + return -1; + } + if (getInstance() == NULL) + return HDFS_CLIENT_ERROR_SIZE_PARAM; + + jstring j_filename = jenv_->NewStringUTF(filename); + if (j_filename == NULL) { + GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_SIZE_PARAM)); + jenv_->PopLocalFrame(NULL); + return HDFS_CLIENT_ERROR_SIZE_PARAM; + } + + tsRecentJMFromJNI = JavaMethods_[JM_HDFS_SIZE_FOR_FILE].jm_full_name; + jlong jresult = jenv_->CallStaticLongMethod(javaClass_, JavaMethods_[JM_HDFS_SIZE_FOR_FILE].methodID, j_filename); + + if (jenv_->ExceptionCheck()) + { + getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsSize()"); + jenv_->PopLocalFrame(NULL); + hdfsClientRetcode = HDFS_CLIENT_ERROR_SIZE_EXCEPTION; + return -1; + } + hdfsClientRetcode = HDFS_CLIENT_OK; + return jresult; +} + HDFS_Client_RetCode HdfsClient::hdfsCleanUnloadPath( const NAString& uldPath) { QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsCleanUnloadPath(%s) called.", @@ -978,7 +1108,47 @@ HDFS_Client_RetCode HdfsClient::hdfsCreateDirectory(const NAString &dirName) { logError(CAT_SQL_HDFS, "HdfsClient::hdfsCreateDirectory()", getLastError()); jenv_->PopLocalFrame(NULL); - return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_EXCEPTION; + return HDFS_CLIENT_ERROR_CREATE_DIRECTORY_EXCEPTION; + } + jenv_->PopLocalFrame(NULL); + return HDFS_CLIENT_OK; +} + +HDFS_Client_RetCode HdfsClient::hdfsRename(const NAString &fromPath, const NAString &toPath) +{ + QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "Enter HDFSClient_JNI::hdfsRename() called."); + if (initJNIEnv() != JOI_OK) + return HDFS_CLIENT_ERROR_RENAME_PARAM; + if (getInstance() == NULL) + return HDFS_CLIENT_ERROR_RENAME_PARAM; + + jstring js_fromPath = jenv_->NewStringUTF(fromPath.data()); + if (js_fromPath == NULL) { + jenv_->PopLocalFrame(NULL); + return HDFS_CLIENT_ERROR_RENAME_PARAM; + } + + jstring js_toPath = jenv_->NewStringUTF(toPath.data()); + if (js_toPath == NULL) { + jenv_->PopLocalFrame(NULL); + return HDFS_CLIENT_ERROR_RENAME_PARAM; + } + + tsRecentJMFromJNI = JavaMethods_[JM_HDFS_RENAME].jm_full_name; + jstring jresult = + (jstring)jenv_->CallStaticObjectMethod(javaClass_, + JavaMethods_[JM_HDFS_RENAME].methodID, js_fromPath, js_toPath); + if (jenv_->ExceptionCheck()) + { + getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsRename()"); + jenv_->PopLocalFrame(NULL); + return HDFS_CLIENT_ERROR_RENAME_EXCEPTION; + } + if (jresult == false) + { + logError(CAT_SQL_HDFS, "HdfsClient::hdfsRename()", getLastError()); + jenv_->PopLocalFrame(NULL); + return HDFS_CLIENT_ERROR_RENAME_EXCEPTION; } jenv_->PopLocalFrame(NULL); return HDFS_CLIENT_OK; http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/executor/HdfsClient_JNI.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/HdfsClient_JNI.h b/core/sql/executor/HdfsClient_JNI.h index 888451c..b4ef741 100644 --- a/core/sql/executor/HdfsClient_JNI.h +++ b/core/sql/executor/HdfsClient_JNI.h @@ -129,6 +129,8 @@ typedef enum { ,HDFS_CLIENT_ERROR_HDFS_OPEN_EXCEPTION ,HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM ,HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION + ,HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_PARAM + ,HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_EXCEPTION ,HDFS_CLIENT_ERROR_HDFS_READ_PARAM ,HDFS_CLIENT_ERROR_HDFS_READ_EXCEPTION ,HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION @@ -151,6 +153,10 @@ typedef enum { ,HDFS_CLIENT_ERROR_GET_FS_DEFAULT_NAME_BUFFER_TOO_SMALL ,HDFS_CLIENT_ERROR_CREATE_DIRECTORY_PARAM ,HDFS_CLIENT_ERROR_CREATE_DIRECTORY_EXCEPTION + ,HDFS_CLIENT_ERROR_RENAME_PARAM + ,HDFS_CLIENT_ERROR_RENAME_EXCEPTION + ,HDFS_CLIENT_ERROR_SIZE_PARAM + ,HDFS_CLIENT_ERROR_SIZE_EXCEPTION ,HDFS_CLIENT_LAST } HDFS_Client_RetCode; @@ -171,7 +177,7 @@ public: ~HdfsClient(); static HdfsClient *newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode, int hdfsIoByteArraySizeInKB = 0); static HdfsClient *getInstance(); - static void deleteInstance(); + static void deleteInstance(HdfsClient *hdfsClient); void setIoByteArraySize(int size) { ioByteArraySizeInKB_ = size; } @@ -182,13 +188,16 @@ public: HDFS_Client_RetCode init(); HDFS_Client_RetCode hdfsCreate(const char* path, NABoolean overwrite, NABoolean compress); HDFS_Client_RetCode hdfsOpen(const char* path, NABoolean compress); - Int32 hdfsWrite(const char* data, Int64 size, HDFS_Client_RetCode &hdfsClientRetcode); - Int32 hdfsRead(const char* data, Int64 size, HDFS_Client_RetCode &hdfsClientRetcode); + Int64 hdfsSize(HDFS_Client_RetCode &hdfsClientRetcode); + Int32 hdfsWrite(const char* data, Int64 size, HDFS_Client_RetCode &hdfsClientRetcode, int maxChunkSize = 0); + Int64 hdfsWriteImmediate(const char* data, Int64 size, HDFS_Client_RetCode &hdfsClientRetcode, int maxChunkSize = 0); + Int32 hdfsRead(Int64 offset, const char* data, Int64 size, HDFS_Client_RetCode &hdfsClientRetcode); HDFS_Client_RetCode hdfsClose(); HDFS_Client_RetCode setHdfsFileInfo(JNIEnv *jenv, jint numFiles, jint fileNo, jboolean isDir, jstring filename, jlong modTime, jlong len, jshort numReplicas, jlong blockSize, jstring owner, jstring group, jshort permissions, jlong accessTime); HDFS_Client_RetCode hdfsListDirectory(const char *pathStr, HDFS_FileInfo **hdfsFileInfo, int *numFiles); + static Int64 hdfsSize(const char *filename, HDFS_Client_RetCode &hdfsClientRetcode); static HDFS_Client_RetCode hdfsMergeFiles(const NAString& srcPath, const NAString& dstPath); static HDFS_Client_RetCode hdfsCleanUnloadPath(const NAString& uldPath ); static HDFS_Client_RetCode hdfsExists(const NAString& uldPath, NABoolean & exists ); @@ -199,6 +208,7 @@ public: // buf_len is the length of the buffer in bytes static HDFS_Client_RetCode getFsDefaultName(char* buffer, Int32 buf_len); static HDFS_Client_RetCode hdfsCreateDirectory(const NAString& path); + static HDFS_Client_RetCode hdfsRename(const NAString& fromPath, const NAString& toPath); private: enum JAVA_METHODS { @@ -206,6 +216,7 @@ private: JM_HDFS_CREATE, JM_HDFS_OPEN, JM_HDFS_WRITE, + JM_HDFS_WRITE_IMMEDIATE, JM_HDFS_READ, JM_HDFS_CLOSE, JM_HDFS_MERGE_FILES, @@ -216,6 +227,9 @@ private: JM_HIVE_TBL_MAX_MODIFICATION_TS, JM_GET_FS_DEFAULT_NAME, JM_HDFS_CREATE_DIRECTORY, + JM_HDFS_RENAME, + JM_HDFS_SIZE, + JM_HDFS_SIZE_FOR_FILE, JM_LAST }; http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/executor/ex_globals.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ex_globals.cpp b/core/sql/executor/ex_globals.cpp index df7e9ef..df2ca67 100644 --- a/core/sql/executor/ex_globals.cpp +++ b/core/sql/executor/ex_globals.cpp @@ -135,7 +135,7 @@ void ex_globals::deleteMe(NABoolean fatalError) statsArea_ = NULL; cleanupTcbs(); tcbList_.deallocate(); - ExpLOBinterfaceCleanup(exLobGlobals_); + NADELETE(exLobGlobals_, ExLobGlobals, exLobGlobals_->getHeap()); exLobGlobals_ = NULL; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/exp/ExpLOB.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOB.cpp b/core/sql/exp/ExpLOB.cpp index 10b2bc1..fe09b43 100644 --- a/core/sql/exp/ExpLOB.cpp +++ b/core/sql/exp/ExpLOB.cpp @@ -860,7 +860,7 @@ else ExRaiseSqlError(h, diagsArea, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceInsert", - (char*)"ExpLOBInterfaceInsert",getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); return ex_expr::EXPR_ERROR; } @@ -1010,7 +1010,7 @@ ex_expr::exp_return_type ExpLOBiud::insertData(Lng32 handleLen, ExRaiseSqlError(h, diagsArea, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceInsert", - (char*)"ExpLOBInterfaceInsert",getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); return ex_expr::EXPR_ERROR; } @@ -1166,7 +1166,7 @@ ex_expr::exp_return_type ExpLOBdelete::eval(char *op_data[], ExRaiseSqlError(h, diagsArea, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceDelete", - (char*)"ExpLOBInterfaceDelete",getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); return ex_expr::EXPR_ERROR; } @@ -1472,7 +1472,7 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char *op_data[], ExRaiseSqlError(h, diagsArea, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceUpdate", - (char*)"ExpLOBInterfaceUpdate",getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); return ex_expr::EXPR_ERROR; } @@ -1611,7 +1611,7 @@ ex_expr::exp_return_type ExpLOBconvert::eval(char *op_data[], ExRaiseSqlError(h, diagsArea, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceSelect", - (char*)"ExpLOBInterfaceSelect",getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); return ex_expr::EXPR_ERROR; } if(toFile()) @@ -1669,7 +1669,7 @@ ex_expr::exp_return_type ExpLOBconvert::eval(char *op_data[], ExRaiseSqlError(h, diagsArea, (ExeErrorCode)(8442), NULL, &intParam1, &cliError, NULL, (char*)"ExpLOBInterfaceSelect", - (char*)"ExpLOBInterfaceSelect",getLobErrStr(intParam1)); + getLobErrStr(intParam1), (char*)getSqlJniErrorStr()); return ex_expr::EXPR_ERROR; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/exp/ExpLOBaccess.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBaccess.cpp b/core/sql/exp/ExpLOBaccess.cpp index a804959..3b4cd93 100644 --- a/core/sql/exp/ExpLOBaccess.cpp +++ b/core/sql/exp/ExpLOBaccess.cpp @@ -85,7 +85,9 @@ ExLob::ExLob(NAHeap * heap) : fs_(NULL), fdData_(NULL), openFlags_(0), - lobTrace_(FALSE) + lobTrace_(FALSE), + useLibHdfs_(FALSE), + hdfsClient_(NULL) { // nothing else to do } @@ -97,6 +99,10 @@ ExLob::~ExLob() hdfsCloseFile(fs_, fdData_); fdData_ = NULL; } + if (hdfsClient_ != NULL) { + HdfsClient::deleteInstance(hdfsClient_); + hdfsClient_ = NULL; + } } @@ -113,6 +119,7 @@ Ex_Lob_Error ExLob::initialize(const char *lobFile, Ex_Lob_Mode mode, struct timespec endTime; Int64 secs, nsecs, totalnsecs; + useLibHdfs_ = lobGlobals->useLibHdfs_; if (lobStorageLocation) { if (lobStorageLocation_.empty()) @@ -148,12 +155,21 @@ Ex_Lob_Error ExLob::initialize(const char *lobFile, Ex_Lob_Mode mode, hdfsPort_ = hdfsPort; // lobLocation_ = lobLocation; clock_gettime(CLOCK_MONOTONIC, &startTime); - - if (lobGlobals->getHdfsFs() == NULL) - return LOB_HDFS_CONNECT_ERROR; - else - fs_ = lobGlobals->getHdfsFs(); - + lobGlobalHeap_ = lobGlobals->getHeap(); + HDFS_Client_RetCode hdfsClientRetcode; + if (useLibHdfs_) { + if (lobGlobals->getHdfsFs() == NULL) + return LOB_HDFS_CONNECT_ERROR; + else + fs_ = lobGlobals->getHdfsFs(); + hdfsClient_ = NULL; + } + else { + hdfsClient_ = HdfsClient::newInstance(lobGlobalHeap_, NULL, hdfsClientRetcode); + fs_ = NULL; + if (hdfsClient_ == NULL) + return LOB_HDFS_CONNECT_ERROR; + } clock_gettime(CLOCK_MONOTONIC, &endTime); @@ -166,7 +182,22 @@ Ex_Lob_Error ExLob::initialize(const char *lobFile, Ex_Lob_Mode mode, } totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs; stats_.hdfsConnectionTime += totalnsecs; - + + if (! useLibHdfs_) { + if (mode == EX_LOB_CREATE) { + hdfsClientRetcode = hdfsClient_->hdfsCreate(lobDataFile_.data(), FALSE, FALSE); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_DATA_FILE_CREATE_ERROR; + } + else { + hdfsClientRetcode = hdfsClient_->hdfsOpen(lobDataFile_.data(), FALSE); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_DATA_FILE_OPEN_ERROR; + } + fdData_ = NULL; + } + else + { if (mode == EX_LOB_CREATE) { // check if file is already created @@ -186,7 +217,7 @@ Ex_Lob_Error ExLob::initialize(const char *lobFile, Ex_Lob_Mode mode, fdData_ = NULL; } - lobGlobalHeap_ = lobGlobals->getHeap(); + } return LOB_OPER_OK; } @@ -297,6 +328,16 @@ Ex_Lob_Error ExLob::getDesc(ExLobDesc &desc,char * handleIn, Int32 handleInLen, Ex_Lob_Error ExLob::writeData(Int64 offset, char *data, Int32 size, Int64 &operLen) { Ex_Lob_Error err; + HDFS_Client_RetCode hdfsClientRetcode = HDFS_CLIENT_OK; + Int64 writeOffset; + + if (! useLibHdfs_ ) { + writeOffset = hdfsClient_->hdfsWriteImmediate(data, size, hdfsClientRetcode); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_DATA_WRITE_ERROR; + operLen = size; + return LOB_OPER_OK; + } lobDebugInfo("In ExLob::writeData",0,__LINE__,lobTrace_); if (!fdData_ || (openFlags_ != (O_WRONLY | O_APPEND))) // file is not open for write { @@ -329,7 +370,8 @@ Ex_Lob_Error ExLob::writeDataSimple(char *data, Int64 size, LobsSubOper subOpera int bufferSize , short replication , int blockSize) { Ex_Lob_Error err; - + if (! useLibHdfs_) + return writeData(0,data, size, operLen); if (!fdData_ || (openFlags_ != (O_WRONLY | O_APPEND))) // file is not open for write { @@ -424,7 +466,14 @@ Ex_Lob_Error ExLob::emptyDirectory(char *dirPath, ExLobGlobals *lobGlobals) { int retcode = 0; - + HDFS_Client_RetCode hdfsClientRetcode; + if (! useLibHdfs_) { + hdfsClientRetcode = HdfsClient::hdfsDeletePath(dirPath); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_DATA_FILE_DELETE_ERROR; + return LOB_OPER_OK; + } + hdfsFileInfo *fileInfos = hdfsGetPathInfo(fs_, dirPath); if (fileInfos == NULL) { @@ -521,8 +570,16 @@ Ex_Lob_Error ExLob::statSourceFile(char *srcfile, Int64 &sourceEOF) lobDebugInfo("In ExLob::statSourceFile",0,__LINE__,lobTrace_); // check if the source file is a hdfs file or from local file system. LobInputOutputFileType srcType = fileType(srcfile); + HDFS_Client_RetCode hdfsClientRetcode; if (srcType == HDFS_FILE) { + if (! useLibHdfs_) { + sourceEOF = HdfsClient::hdfsSize(srcfile, hdfsClientRetcode); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_SOURCE_FILE_OPEN_ERROR; + ex_assert(sourceEOF >= 0, "Offset is -1 possibly due to path being directory"); + } + else { hdfsFile sourceFile = hdfsOpenFile(fs_,srcfile,O_RDONLY,0,0,0); if (!sourceFile) return LOB_SOURCE_FILE_OPEN_ERROR; @@ -536,8 +593,8 @@ Ex_Lob_Error ExLob::statSourceFile(char *srcfile, Int64 &sourceEOF) str_sprintf(logBuf,"Returning EOF of %ld for file %s", sourceEOF,srcfile); lobDebugInfo(logBuf, 0,__LINE__,lobTrace_); + } } - else if (srcType == LOCAL_FILE) { int openFlags = O_RDONLY; @@ -554,11 +611,7 @@ Ex_Lob_Error ExLob::statSourceFile(char *srcfile, Int64 &sourceEOF) if (stat(srcfile, &statbuf) != 0) { return LOB_SOURCE_FILE_STAT_ERROR; } - sourceEOF = statbuf.st_size; - - - flock(fdSrcFile, LOCK_UN); close(fdSrcFile); str_sprintf(logBuf,"Returning EOF of %ld for file %s", sourceEOF,srcfile); @@ -638,8 +691,28 @@ Ex_Lob_Error ExLob::readHdfsSourceFile(char *srcfile, char *&fileData, Int32 &si char logBuf[4096]; str_sprintf(logBuf,"Calling ::readHdfsSourceFile: %s Offset:%ld, Size: %d",srcfile, offset,size); lobDebugInfo(logBuf, 0,__LINE__,lobTrace_); - - + HDFS_Client_RetCode hdfsClientRetcode; + Int64 bytesRead; + if (!useLibHdfs_) { + HdfsClient *srcHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode); + ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error"); + hdfsClientRetcode = srcHdfsClient->hdfsOpen(srcfile, FALSE); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_SOURCE_FILE_OPEN_ERROR; + fileData = (char *) (getLobGlobalHeap())->allocateMemory(size); + if (fileData == (char *)-1) + return LOB_SOURCE_DATA_ALLOC_ERROR; + bytesRead = srcHdfsClient->hdfsRead(offset, fileData, size, hdfsClientRetcode); + if (hdfsClientRetcode != HDFS_CLIENT_OK) { + HdfsClient::deleteInstance(srcHdfsClient); + getLobGlobalHeap()->deallocateMemory(fileData); + return LOB_SOURCE_FILE_READ_ERROR; + } + size = bytesRead; + // Memory growth/leak + HdfsClient::deleteInstance(srcHdfsClient); + return LOB_OPER_OK; + } int openFlags = O_RDONLY; hdfsFile fdSrcFile = hdfsOpenFile(fs_,srcfile, openFlags,0,0,0); if (fdSrcFile == NULL) @@ -662,6 +735,7 @@ Ex_Lob_Error ExLob::readHdfsSourceFile(char *srcfile, char *&fileData, Int32 &si return LOB_OPER_OK; } + Ex_Lob_Error ExLob::readLocalSourceFile(char *srcfile, char *&fileData, Int32 &size, Int64 offset) { char logBuf[4096]; @@ -952,11 +1026,14 @@ Ex_Lob_Error ExLob::writeLobData(char *source, Int64 sourceLen, LobsSubOper subO Ex_Lob_Error err=LOB_OPER_OK; char logBuf[4096]; lobDebugInfo("In ExLob::writeLobData",0,__LINE__,lobTrace_); + HDFS_Client_RetCode hdfsClientRetcode = HDFS_CLIENT_OK; + Int64 writeOffset; + char *inputAddr = source; Int64 readOffset = 0; Int32 allocMemSize = 0; Int64 inputSize = sourceLen; - Int64 writeOffset = tgtOffset; + writeOffset = tgtOffset; if (subOperation == Lob_External_File) return LOB_OPER_OK; while(inputSize > 0) @@ -1003,8 +1080,10 @@ Ex_Lob_Error ExLob::writeLobData(char *source, Int64 sourceLen, LobsSubOper subO } } lobDebugInfo("Leaving ExLob::writeLobData",0,__LINE__,lobTrace_); - hdfsCloseFile(fs_, fdData_); - fdData_=NULL; + if (useLibHdfs_) { + hdfsCloseFile(fs_, fdData_); + fdData_=NULL; + } return err; } @@ -1439,6 +1518,13 @@ Ex_Lob_Error ExLob::delDesc(char *handleIn, Int32 handleInLen, Int64 transId) Ex_Lob_Error ExLob::purgeLob() { char logBuf[4096]; + if (! useLibHdfs_) { + HDFS_Client_RetCode hdfsClientRetcode; + hdfsClientRetcode = HdfsClient::hdfsDeletePath(lobDataFile_.data()); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_DATA_FILE_DELETE_ERROR; + return LOB_OPER_OK; + } if (hdfsDelete(fs_, lobDataFile_.data(), 0) != 0) { // extract a substring small enough to fit into logBuf @@ -1693,6 +1779,26 @@ Ex_Lob_Error ExLob::allocateDesc(ULng32 size, Int64 &descNum, Int64 &dataOffset, char logBuf[4096]; lobDebugInfo("In ExLob::allocateDesc",0,__LINE__,lobTrace_); Int32 openFlags = O_RDONLY ; + HDFS_Client_RetCode hdfsClientRetcode; + + if (! useLibHdfs_) { + if (size == 0) { + hdfsClientRetcode = HdfsClient::hdfsDeletePath(lobDataFile_.data()); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_DATA_FILE_WRITE_ERROR; + else { + dataOffset = 0; + return LOB_OPER_OK; + } + } + else { + dataOffset = hdfsClient_->hdfsSize(hdfsClientRetcode); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_DATA_FILE_WRITE_ERROR; + ex_assert(dataOffset >= 0, "Offset is -1 possibly due to path being directory"); + return LOB_OPER_OK; + } + } if (size == 0) //we are trying to empty this lob. { //rename lob datafile @@ -1793,12 +1899,13 @@ Ex_Lob_Error ExLob::allocateDesc(ULng32 size, Int64 &descNum, Int64 &dataOffset, return LOB_OPER_OK; } + Ex_Lob_Error ExLob::compactLobDataFile(ExLobInMemoryDescChunksEntry *dcArray,Int32 numEntries) { Ex_Lob_Error rc = LOB_OPER_OK; char logBuf[4096]; lobDebugInfo("In ExLob::compactLobDataFile",0,__LINE__,lobTrace_); - Int64 maxMemChunk = 100*1024*1024; //100 MB limit for intermediate buffer for transfering data + Int64 maxMemChunk = 64*1024*1024; //64 MB limit for intermediate buffer for transfering data // make some temporary file names size_t len = lobDataFile_.length(); @@ -1827,14 +1934,15 @@ Ex_Lob_Error ExLob::compactLobDataFile(ExLobInMemoryDescChunksEntry *dcArray,Int lobDataFileSubstr,tmpLobDataFileSubstr, saveLobDataFileSubstr); lobDebugInfo(logBuf,0,__LINE__,lobTrace_); - hdfsFS fs = hdfsConnect(hdfsServer_,hdfsPort_); - if (fs == NULL) - return LOB_DATA_FILE_OPEN_ERROR; - - - hdfsFile fdData = hdfsOpenFile(fs, lobDataFile_.data(), O_RDONLY, 0, 0,0); + + HDFS_Client_RetCode hdfsClientRetcode = HDFS_CLIENT_OK; + HdfsClient *srcHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode); + ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error"); + HdfsClient *dstHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode); + ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error"); - if (!fdData) + hdfsClientRetcode = srcHdfsClient->hdfsOpen(lobDataFile_.data(), FALSE); + if (hdfsClientRetcode != HDFS_CLIENT_OK) { // extract substring small enough to fit in logBuf len = MINOF(lobDataFile_.length(),sizeof(logBuf) - 40); @@ -1844,14 +1952,11 @@ Ex_Lob_Error ExLob::compactLobDataFile(ExLobInMemoryDescChunksEntry *dcArray,Int str_sprintf(logBuf,"Could not open file:%s",lobDataFileSubstr2); lobDebugInfo(logBuf,0,__LINE__,lobTrace_); - hdfsCloseFile(fs,fdData); - fdData = NULL; return LOB_DATA_FILE_OPEN_ERROR; } - - hdfsFile fdTemp = hdfsOpenFile(fs, tmpLobDataFile,O_WRONLY|O_CREAT,0,0,0); - if (!fdTemp) + hdfsClientRetcode = dstHdfsClient->hdfsCreate(tmpLobDataFile, TRUE, FALSE); + if (hdfsClientRetcode != HDFS_CLIENT_OK) { // extract substring small enough to fit in logBuf len = MINOF(sizeof(tmpLobDataFile),sizeof(logBuf)/3 - 20); @@ -1861,8 +1966,8 @@ Ex_Lob_Error ExLob::compactLobDataFile(ExLobInMemoryDescChunksEntry *dcArray,Int str_sprintf(logBuf,"Could not open file:%s",tmpLobDataFileSubstr2); lobDebugInfo(logBuf,0,__LINE__,lobTrace_); - hdfsCloseFile(fs,fdTemp); - fdTemp = NULL; + srcHdfsClient->hdfsClose(); + HdfsClient::deleteInstance(srcHdfsClient); return LOB_DATA_FILE_OPEN_ERROR; } @@ -1871,112 +1976,86 @@ Ex_Lob_Error ExLob::compactLobDataFile(ExLobInMemoryDescChunksEntry *dcArray,Int Int64 bytesWritten = 0; Int64 size = 0; Int64 chunkLen = 0; + Int64 readLen = 0; + Int64 offset; char * tgt = NULL; - while (i < numEntries) - { - chunkLen = dcArray[i].getChunkLen(); - if (chunkLen > maxMemChunk) - { - tgt = (char *)(getLobGlobalHeap())->allocateMemory(maxMemChunk); - while (chunkLen > maxMemChunk) - { - bytesRead = hdfsPread(fs,fdData,dcArray[i].getCurrentOffset(),tgt,maxMemChunk); - if (bytesRead != maxMemChunk) - { - lobDebugInfo("Problem reading from data file",0,__LINE__,lobTrace_); - getLobGlobalHeap()->deallocateMemory(tgt); - return LOB_DATA_READ_ERROR; - } - bytesWritten = hdfsWrite(fs,fdTemp, tgt,maxMemChunk); - if (bytesWritten != size) - { - lobDebugInfo("Problem writing temp data file",0,__LINE__,lobTrace_); - getLobGlobalHeap()->deallocateMemory(tgt); - return LOB_TARGET_FILE_WRITE_ERROR; - } - chunkLen -= maxMemChunk; - } - - } - else - { - tgt = (char *)(getLobGlobalHeap())->allocateMemory(chunkLen); - bytesRead = hdfsPread(fs,fdData,dcArray[i].getCurrentOffset(),tgt,chunkLen); - if (bytesRead != chunkLen) - { - lobDebugInfo("Problem reading from data file",0,__LINE__,lobTrace_); - getLobGlobalHeap()->deallocateMemory(tgt); - return LOB_DATA_READ_ERROR; - } - bytesWritten = hdfsWrite(fs,fdTemp, tgt,chunkLen); - if (bytesWritten != chunkLen) - { - lobDebugInfo("Problem writing to temp data file",0,__LINE__,lobTrace_); - getLobGlobalHeap()->deallocateMemory(tgt); - return LOB_TARGET_FILE_WRITE_ERROR; - } - } - if (hdfsFlush(fs, fdTemp)) { - lobDebugInfo("Problem flushing to temp data file",0,__LINE__,lobTrace_); - return LOB_DATA_FLUSH_ERROR; + Ex_Lob_Error saveError = LOB_OPER_OK; + tgt = (char *)(getLobGlobalHeap())->allocateMemory(maxMemChunk); + while ((i < numEntries) && (saveError == LOB_OPER_OK)) + { + readLen = dcArray[i].getChunkLen(); + offset = dcArray[i].getCurrentOffset(); + while (readLen > 0) + { + if (readLen > maxMemChunk) + chunkLen = maxMemChunk; + else + chunkLen = readLen; + bytesRead = srcHdfsClient->hdfsRead(offset, tgt, chunkLen, hdfsClientRetcode); + if (hdfsClientRetcode != HDFS_CLIENT_OK) { + lobDebugInfo("Problem reading from data file",0,__LINE__,lobTrace_); + saveError = LOB_SOURCE_FILE_READ_ERROR; + break; + } + bytesWritten = dstHdfsClient->hdfsWrite(tgt, chunkLen, hdfsClientRetcode); + if (hdfsClientRetcode != HDFS_CLIENT_OK || bytesWritten != chunkLen) { + lobDebugInfo("Problem writing temp data file",0,__LINE__,lobTrace_); + saveError = LOB_DATA_FILE_WRITE_ERROR; + break; + } + readLen -= chunkLen; + offset += chunkLen; } - getLobGlobalHeap()->deallocateMemory(tgt); i++; - } - hdfsCloseFile(fs,fdTemp); - hdfsCloseFile(fs,fdData); - + } + getLobGlobalHeap()->deallocateMemory(tgt); + srcHdfsClient->hdfsClose(); + dstHdfsClient->hdfsClose(); + HdfsClient::deleteInstance(srcHdfsClient); + HdfsClient::deleteInstance(dstHdfsClient); + if (saveError != LOB_OPER_OK) + return saveError; //Now save the data file and rename the tempfile to the original datafile - Int32 rc2 = hdfsRename(fs,lobDataFile_.data(),saveLobDataFile); - if (rc2 == -1) - { + hdfsClientRetcode = HdfsClient::hdfsRename(lobDataFile_.data(),saveLobDataFile); + if (hdfsClientRetcode != HDFS_CLIENT_OK) { lobDebugInfo("Problem renaming datafile to save data file",0,__LINE__,lobTrace_); return LOB_DATA_FILE_WRITE_ERROR; - } - rc2 = hdfsRename(fs,tmpLobDataFile, lobDataFile_.data()); - if (rc2 == -1) - { + } + hdfsClientRetcode = HdfsClient::hdfsRename(tmpLobDataFile, lobDataFile_.data()); + if (hdfsClientRetcode != HDFS_CLIENT_OK) { lobDebugInfo("Problem renaming temp datafile to data file",0,__LINE__,lobTrace_); return LOB_DATA_FILE_WRITE_ERROR; - } + } return LOB_OPER_OK; } Ex_Lob_Error ExLob::restoreLobDataFile() { - Ex_Lob_Error rc = LOB_OPER_OK; lobDebugInfo("In ExLob::restoreLobDataFile",0,__LINE__,lobTrace_); - - hdfsFS fs = hdfsConnect(hdfsServer_,hdfsPort_); - if (fs == NULL) - return LOB_DATA_FILE_OPEN_ERROR; + HDFS_Client_RetCode hdfsClientRetcode; char saveLobDataFile[lobDataFile_.length() + sizeof("_save")]; // sizeof includes room for null terminator strcpy(saveLobDataFile,lobDataFile_.data()); strcpy(saveLobDataFile+lobDataFile_.length(),"_save"); - Int32 rc2 = hdfsDelete(fs,lobDataFile_.data(),FALSE);//ok to ignore error. - rc2 = hdfsRename(fs,saveLobDataFile, lobDataFile_.data()); - if (rc2) + hdfsClientRetcode = HdfsClient::hdfsDeletePath(lobDataFile_.data()); + hdfsClientRetcode = HdfsClient::hdfsRename(saveLobDataFile, lobDataFile_.data()); + if (hdfsClientRetcode != HDFS_CLIENT_OK) { lobDebugInfo("Problem renaming savedatafile to data file",0,__LINE__,lobTrace_); return LOB_OPER_ERROR; } - return rc; + return LOB_OPER_OK; } Ex_Lob_Error ExLob::purgeBackupLobDataFile() { Ex_Lob_Error rc = LOB_OPER_OK; - lobDebugInfo("In ExLob::purgeBackupLobDataFile",0,__LINE__,lobTrace_); - hdfsFS fs = hdfsConnect(hdfsServer_,hdfsPort_); - if (fs == NULL) - return LOB_DATA_FILE_OPEN_ERROR; + lobDebugInfo("In ExLob::purgeBackupLobDataFile",0,__LINE__,lobTrace_); char saveLobDataFile[lobDataFile_.length() + sizeof("_save")]; // sizeof includes room for null terminator strcpy(saveLobDataFile,lobDataFile_.data()); strcpy(saveLobDataFile+lobDataFile_.length(),"_save"); - Int32 rc2 = hdfsDelete(fs,saveLobDataFile,FALSE);//ok to ignore error. - + HDFS_Client_RetCode hdfsClientRetcode = HdfsClient::hdfsDeletePath(saveLobDataFile);//ok to ignore error. return rc; } /////////////////////////////////////////////////////////////////////////////// @@ -2055,6 +2134,29 @@ Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 tgtSize, cursor_t &cursor, I bytesAvailable = cursor.descSize_ - cursor.bytesRead_; bytesToCopy = min(bytesAvailable, tgtSize - operLen); offset = cursor.descOffset_ + cursor.bytesRead_; + if (!useLibHdfs_) { + HDFS_Client_RetCode hdfsClientRetcode; + if (storage_ == Lob_External_HDFS_File) { + HdfsClient *srcHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode); + ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error"); + hdfsClientRetcode = srcHdfsClient->hdfsOpen(lobDataFile_.data(), FALSE); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_SOURCE_FILE_OPEN_ERROR; + bytesRead = srcHdfsClient->hdfsRead(offset, tgt, bytesToCopy, hdfsClientRetcode); + if (hdfsClientRetcode != HDFS_CLIENT_OK) { + HdfsClient::deleteInstance(srcHdfsClient); + return LOB_SOURCE_FILE_READ_ERROR; + } + HdfsClient::deleteInstance(srcHdfsClient); + } + else { + bytesRead = hdfsClient_->hdfsRead(offset, tgt, bytesToCopy, hdfsClientRetcode); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_DATA_READ_ERROR; + } + } + else { + // #endif if (!fdData_ || (openFlags_ != O_RDONLY)) @@ -2089,7 +2191,7 @@ Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 tgtSize, cursor_t &cursor, I } Int64 totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs; stats_.CumulativeReadTime += totalnsecs; - + } // useLibHdfs if (bytesRead == -1) { return LOB_DATA_READ_ERROR; } else if (bytesRead == 0) { @@ -2102,9 +2204,10 @@ Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 tgtSize, cursor_t &cursor, I operLen += bytesRead; tgt += bytesRead; } - - hdfsCloseFile(fs_, fdData_); - fdData_ = NULL; + if (useLibHdfs_) { + hdfsCloseFile(fs_, fdData_); + fdData_ = NULL; + } return LOB_OPER_OK; } @@ -2133,6 +2236,8 @@ Ex_Lob_Error ExLob::readDataToMem(char *memAddr, return err; + if (useLibHdfs_) + { if (fdData_)// we may have a stale handle. close and open to refresh { hdfsCloseFile(fs_, fdData_); @@ -2161,10 +2266,19 @@ Ex_Lob_Error ExLob::readDataToMem(char *memAddr, } - + } // useLibHdfs_ if (!multipleChunks) { + if (! useLibHdfs_) { + HDFS_Client_RetCode hdfsClientRetcode; + Int32 readLen; + readLen = hdfsClient_->hdfsRead(offset, memAddr, size, hdfsClientRetcode); + if (hdfsClientRetcode != HDFS_CLIENT_OK) + return LOB_DATA_READ_ERROR; + operLen = readLen; + return LOB_OPER_OK; + } lobDebugInfo("Reading in single chunk",0,__LINE__,lobTrace_); if ((bytesRead = hdfsPread(fs_, fdData_, offset, memAddr, size)) == -1) { @@ -2972,28 +3086,8 @@ if (!lobGlobals->isHive() ) void cleanupLOBDataDescFiles(const char *lobHdfsServer,int lobHdfsPort,const char *lobHdfsLoc) { - int numExistingFiles=0; - hdfsFS fs; - int err = 0; - fs = hdfsConnect(lobHdfsServer, lobHdfsPort); - if (fs == NULL) - return; - // Get this list of all data and desc files in the lob sotrage location - hdfsFileInfo *fileInfos = hdfsListDirectory(fs, lobHdfsLoc, &numExistingFiles); - if (fileInfos == NULL) - return ; - - //Delete each one in a loop - for (int i = 0; i < numExistingFiles; i++) - { - err = hdfsDelete(fs, fileInfos[i].mName, 0); - } - - // *Note* : delete the memory allocated by libhdfs for the file info array - if (fileInfos) - { - hdfsFreeFileInfo(fileInfos, numExistingFiles); - } + HDFS_Client_RetCode hdfsClientRetcode = HdfsClient::hdfsDeletePath(lobHdfsLoc);//ok to ignore error. + return; } @@ -3392,7 +3486,8 @@ ExLobGlobals::ExLobGlobals(NAHeap *lobHeap) : threadTraceFile_(NULL), lobTrace_(FALSE), numWorkerThreads_(0), - heap_(lobHeap) + heap_(lobHeap), + useLibHdfs_(FALSE) { //initialize the log file if (getenv("TRACE_HDFS_THREAD_ACTIONS")) @@ -3403,6 +3498,12 @@ ExLobGlobals::ExLobGlobals(NAHeap *lobHeap) : } if(getenv("TRACE_LOB_ACTIONS")) lobTrace_ = TRUE; + char *useLibHdfsStr = getenv("USE_LIBHDFS"); + int useLibHdfs = 0; + if (useLibHdfsStr != NULL) + useLibHdfs = atoi(useLibHdfsStr); + if (useLibHdfs != 0) + useLibHdfs_ = TRUE; } ExLobGlobals::~ExLobGlobals() @@ -3481,10 +3582,17 @@ ExLobGlobals::~ExLobGlobals() //delete the lobMap AFTER the worker threads have finished their pending //work since they may still be using an objetc that was fetched off the lobMap_ if (lobMap_) - { + { + lobMap_it it2; + for (it2 = lobMap_->begin(); it2 != lobMap_->end() ; ++it2) + { + ExLob *lobPtr = it2->second; + NADELETE(lobPtr, ExLob, heap_); + } + lobMap_->clear(); NADELETE(lobMap_,lobMap_t,heap_); lobMap_ = NULL; - } + } //msg_mon_close_process(&serverPhandle); if (threadTraceFile_) http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/exp/ExpLOBaccess.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBaccess.h b/core/sql/exp/ExpLOBaccess.h index 3c5e93e..416c3cc 100644 --- a/core/sql/exp/ExpLOBaccess.h +++ b/core/sql/exp/ExpLOBaccess.h @@ -397,6 +397,10 @@ class ExLob : public NABasicObject ExLob(NAHeap * heap); // default constructor virtual ~ExLob(); // default desctructor + void setUseLibHdfs(NABoolean useLibHdfs) + { useLibHdfs_ = useLibHdfs; } + + NABoolean useLibHdfs() { return useLibHdfs_; } Ex_Lob_Error initialize(const char *lobFile, Ex_Lob_Mode mode, char *dir, LobsStorage storage, char *hdfsServer, Int64 hdfsPort, char *lobLocation, @@ -515,6 +519,8 @@ class ExLob : public NABasicObject bool prefetchQueued_; NAHeap *lobGlobalHeap_; NABoolean lobTrace_; + NABoolean useLibHdfs_; + HdfsClient *hdfsClient_; }; typedef map<string, ExLob *> lobMap_t; @@ -619,6 +625,11 @@ class ExLobGlobals { return heap_; } + + NABoolean useLibHdfs() { return useLibHdfs_; } + void setUseLibHdfs(NABoolean useLibHdfs) + { useLibHdfs_ = useLibHdfs; } + void traceMessage(const char *logMessage, ExLobCursor *c, int line); public : @@ -639,6 +650,7 @@ class ExLobGlobals NAHeap *heap_; NABoolean lobTrace_; long numWorkerThreads_; + NABoolean useLibHdfs_; }; http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/exp/ExpLOBinterface.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOBinterface.cpp b/core/sql/exp/ExpLOBinterface.cpp index 1a6d6ed..a90a0cb 100644 --- a/core/sql/exp/ExpLOBinterface.cpp +++ b/core/sql/exp/ExpLOBinterface.cpp @@ -65,16 +65,17 @@ Lng32 ExpLOBinterfaceInit(ExLobGlobals *& exLobGlob, NAHeap * parentHeap, 0); if (exLobGlob) { - - if (isHiveRead) + if (exLobGlob->useLibHdfs_) + { + if (isHiveRead) { ((ExLobGlobals *)exLobGlob)->startWorkerThreads(); lobHeap->setThreadSafe(); } - - + } } - + if (exLobGlob->useLibHdfs_) + { //set hdfsConnection from context global ContextCli *localContext = (ContextCli *)currContext; if (localContext) @@ -89,7 +90,8 @@ Lng32 ExpLOBinterfaceInit(ExLobGlobals *& exLobGlob, NAHeap * parentHeap, ((ExLobGlobals *)exLobGlob)->setHdfsFs(fs); } } - + } + if (err != LOB_OPER_OK) return err; else http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/regress/executor/EXPECTED130 ---------------------------------------------------------------------- diff --git a/core/sql/regress/executor/EXPECTED130 b/core/sql/regress/executor/EXPECTED130 index fa69e88..efedb9c 100644 --- a/core/sql/regress/executor/EXPECTED130 +++ b/core/sql/regress/executor/EXPECTED130 @@ -652,12 +652,12 @@ And the dish ran away with the fork ! >> >>insert into tlob130txt_limit50 values(1,filetolob('lob_input_e1.txt')); -*** ERROR[8442] Unable to access ExpLOBInterfaceInsert interface. Call to ExpLOBInterfaceInsert returned error LOB_MAX_LIMIT_ERROR(560). Error detail 0. +*** ERROR[8442] Unable to access ExpLOBInterfaceInsert interface. Call to ExpLOBInterfaceInsert returned error LOB_MAX_LIMIT_ERROR(560). Error detail: 0. Cause: . --- 0 row(s) inserted. >>insert into tlob130bin_limit1K values(1,filetolob('anoush.jpg')); -*** ERROR[8442] Unable to access ExpLOBInterfaceInsert interface. Call to ExpLOBInterfaceInsert returned error LOB_MAX_LIMIT_ERROR(560). Error detail 0. +*** ERROR[8442] Unable to access ExpLOBInterfaceInsert interface. Call to ExpLOBInterfaceInsert returned error LOB_MAX_LIMIT_ERROR(560). Error detail: 0. Cause: . --- 0 row(s) inserted. >>--test extract http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index 2f24dce..7d1b43b 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -292,6 +292,7 @@ public class HDFSClient boolean hdfsCreate(String fname , boolean overwrite, boolean compress) throws IOException { + filename_ = fname; if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsCreate() - started" ); if (!compress || (compress && fname.endsWith(".gz"))) @@ -302,27 +303,17 @@ public class HDFSClient fs_ = FileSystem.get(filepath_.toUri(),config_); compressed_ = compress; fsdis_ = null; - FSDataOutputStream fsOut; + FSDataOutputStream fsOut = null; if (overwrite) fsOut = fs_.create(filepath_); - else - if (fs_.exists(filepath_)) - fsOut = fs_.append(filepath_); - else - fsOut = fs_.create(filepath_); - - if (compressed_) { - GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_); - Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec); - outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor); - } - else - outStream_ = fsOut; + if (fsOut != null) + fsOut.close(); return true; - } + } boolean hdfsOpen(String fname , boolean compress) throws IOException { + filename_ = fname; if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsOpen() - started" ); if (!compress || (compress && fname.endsWith(".gz"))) @@ -335,6 +326,43 @@ public class HDFSClient fsdis_ = null; return true; } + + long hdfsSize() throws IOException + { + FileStatus filestatus; + try + { + filestatus = fs_.getFileStatus(filepath_); + } catch (java.io.FileNotFoundException e) + { + return 0; + } + if (filestatus.isFile()) + return filestatus.getLen(); + else + return -1; + } + + long hdfsWriteImmediate(byte[] buff) throws IOException + { + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsWriteClose() - started" ); + FSDataOutputStream fsOut; + FileStatus filestatus; + long writeOffset; + if (fs_.exists(filepath_)) { + filestatus = fs_.getFileStatus(filepath_); + fsOut = fs_.append(filepath_); + writeOffset = filestatus.getLen(); + } + else { + fsOut = fs_.create(filepath_); + writeOffset = 0; + } + fsOut.write(buff); + fsOut.close(); + return writeOffset; + } int hdfsWrite(byte[] buff) throws IOException { @@ -359,16 +387,19 @@ public class HDFSClient logger_.debug("HDFSClient.hdfsWrite() - output stream created" ); } outStream_.write(buff); + if (outStream_ instanceof FSDataOutputStream) + ((FSDataOutputStream)outStream_).hsync(); if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsWrite() - bytes written " + buff.length); return buff.length; } - int hdfsRead(ByteBuffer buffer) throws IOException + int hdfsRead(long pos, ByteBuffer buffer) throws IOException { if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsRead() - started" ); if (fsdis_ == null && inStream_ == null ) { + try { codec_ = codecFactory_.getCodec(filepath_); if (codec_ != null) { compressed_ = true; @@ -376,19 +407,34 @@ public class HDFSClient } else fsdis_ = fs_.open(filepath_); - pos_ = 0; + } catch (java.io.FileNotFoundException e) { + return 0; + } } int lenRemain; int bytesRead; int totalBytesRead = 0; int bufLen; int bufOffset = 0; + if (compressed_) { + if (pos != 0 && pos != -1) + throw new IOException("Compressed files can be read from a non-zero position"); + else + pos_ = 0; + } + else + if (pos != -1) + pos_ = pos; if (compressed_ && bufArray_ != null) bufArray_ = new byte[ioByteArraySizeInKB_ * 1024]; if (buffer.hasArray()) bufLen = buffer.array().length; else + { + if (pos_ != -1) + fsdis_.seek(pos_); bufLen = buffer.capacity(); + } lenRemain = bufLen; do { @@ -413,7 +459,6 @@ public class HDFSClient { if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsClose() - started" ); if (outStream_ != null) { - outStream_.flush(); outStream_.close(); outStream_ = null; } @@ -422,7 +467,24 @@ public class HDFSClient return true; } - + static long hdfsSize(String filename) throws IOException + { + Path filepath = new Path(filename); + FileSystem fs = FileSystem.get(filepath.toUri(),config_); + FileStatus filestatus; + try + { + filestatus = fs.getFileStatus(filepath); + } catch (java.io.FileNotFoundException e) + { + return 0; + } + if (filestatus.isFile()) + return filestatus.getLen(); + else + return -1; + } + public static boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) throws IOException { if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - start"); @@ -547,7 +609,7 @@ public class HDFSClient return 0; } - + public void stop() throws IOException { if (future_ != null) { @@ -669,6 +731,17 @@ public class HDFSClient return true; } + public static boolean hdfsRename(String fromPathStr, String toPathStr) throws IOException + { + if (logger_.isDebugEnabled()) + logger_.debug("HDFSClient.hdfsRename(" + fromPathStr + ", " + toPathStr + ")"); + Path fromPath = new Path(fromPathStr ); + Path toPath = new Path(toPathStr ); + FileSystem fs = FileSystem.get(fromPath.toUri(), config_); + fs.rename(fromPath, toPath); + return true; + } + private native int sendFileStatus(long jniObj, int numFiles, int fileNo, boolean isDir, String filename, long modTime, long len, short numReplicas, long blockSize, String owner, String group,