Repository: trafodion
Updated Branches:
  refs/heads/master 072708f1b -> a6512c05c


[TRAFODION-3110] Refactor LOB access to use the new implementation of HdfsClient

This feature is enabled by default. To disable, set a variable USE_LIBHDFS=1 in
$TRAF_HOME/etc/ms.env and restart the trafodion cluster.

This feature includes the following:
1. Uses single FSDataInputStream for each LOB column in a query as
   opposed to the opening the hdfs file for every row.
2. Uses FSDataOutputStream to write the lob data but closes it
   immediately to allow concurrent writes to the hdfs file. HDFS supports
   a single writer at a time. Need to conform if multiple writes can
   be done without the need for RMS lock feature.
3. Improved error messaging that displays the java exception stack to the
   end user.
4. LOB worker threads are no longer created


Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/52f074af
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/52f074af
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/52f074af

Branch: refs/heads/master
Commit: 52f074af0e866fe82ea4f37eaf90cdc880871851
Parents: 5b5ab32
Author: selvaganesang <selva.govindara...@esgyn.com>
Authored: Fri Jun 15 22:53:04 2018 +0000
Committer: selvaganesang <selva.govindara...@esgyn.com>
Committed: Fri Jun 15 23:15:13 2018 +0000

----------------------------------------------------------------------
 core/sql/bin/SqlciErrors.txt                    |   2 +-
 core/sql/cli/Cli.cpp                            |  16 +-
 core/sql/executor/ExExeUtilGet.cpp              |  46 +--
 core/sql/executor/ExExeUtilLoad.cpp             |  26 +-
 core/sql/executor/HdfsClient_JNI.cpp            | 206 +++++++++-
 core/sql/executor/HdfsClient_JNI.h              |  20 +-
 core/sql/executor/ex_globals.cpp                |   2 +-
 core/sql/exp/ExpLOB.cpp                         |  12 +-
 core/sql/exp/ExpLOBaccess.cpp                   | 390 ++++++++++++-------
 core/sql/exp/ExpLOBaccess.h                     |  12 +
 core/sql/exp/ExpLOBinterface.cpp                |  14 +-
 core/sql/regress/executor/EXPECTED130           |   4 +-
 .../main/java/org/trafodion/sql/HDFSClient.java | 113 +++++-
 13 files changed, 607 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/bin/SqlciErrors.txt
----------------------------------------------------------------------
diff --git a/core/sql/bin/SqlciErrors.txt b/core/sql/bin/SqlciErrors.txt
index ee51272..4e644e8 100644
--- a/core/sql/bin/SqlciErrors.txt
+++ b/core/sql/bin/SqlciErrors.txt
@@ -1578,7 +1578,7 @@ $1~String1 --------------------------------
 8437 ZZZZZ 99999 BEGINNER MAJOR DBADMIN Mismatch detected between external 
table and underlying hive table definitions.
 8440 ZZZZZ 99999 BEGINNER MAJOR DBADMIN The size of the history buffer is too 
small to execute one or more of the OLAP Windowed Functions in the query. 
 8441 ZZZZZ 99999 BEGINNER MAJOR DBADMIN one or more of the OLAP Windowed 
Functions in the query may require overflow which is not supported yet.
-8442 ZZZZZ 99999 BEGINNER MAJOR DBADMIN Unable to access $0~string0 interface. 
Call to $1~string1 returned error $2~string2($0~int0). Error detail $1~int1.
+8442 ZZZZZ 99999 BEGINNER MAJOR DBADMIN Unable to access $0~string0 interface. 
Call to $0~string0 returned error $1~string1($0~int0). Error detail: $1~int1. 
Cause: $2~string2.
 8443 ZZZZZ 99999 BEGINNER MAJOR DBADMIN Provided LOB handle is invalid.
 8444 ZZZZZ 99999 BEGINNER MAJOR DBADMIN Only one lob handle can be returned by 
child for data extract.
 8445 ZZZZZ 99999 BEGINNER MAJOR DBADMIN An error occurred during 
transformation of hdfs row to sql row. Error Detail: $0~string0 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/cli/Cli.cpp
----------------------------------------------------------------------
diff --git a/core/sql/cli/Cli.cpp b/core/sql/cli/Cli.cpp
index a942bca..d641f6d 100644
--- a/core/sql/cli/Cli.cpp
+++ b/core/sql/cli/Cli.cpp
@@ -9397,7 +9397,7 @@ Lng32 SQLCLI_LOB_GC_Interface
       ExRaiseSqlError(currContext.exHeap(), &da, 
                       (ExeErrorCode)(8442), NULL, &cliRC    , 
                       &rc, NULL, (char*)"Lob GC call",
-                      getLobErrStr(rc));
+                     getLobErrStr(rc), (char*)getSqlJniErrorStr());
       // TBD When local transaction support is in
       // rollback all the updates to the lob desc chunks file too. 
       // return with warning
@@ -9533,8 +9533,7 @@ Lng32 SQLCLI_LOBddlInterface
             ExRaiseSqlError(currContext.exHeap(), &da, 
                            (ExeErrorCode)(8442), NULL, &cliRC    , 
                            &rc, NULL, (char*)"ExpLOBInterfaceCreate",
-
-                           getLobErrStr(rc));
+                           getLobErrStr(rc), (char*)getSqlJniErrorStr());
             goto non_cli_error_return;
           }
           
@@ -9553,8 +9552,7 @@ Lng32 SQLCLI_LOBddlInterface
                ExRaiseSqlError(currContext.exHeap(), &da, 
                            (ExeErrorCode)(8442), NULL, &cliRC    , 
                            &rc, NULL, (char*)"ExpLOBInterfaceCreate",
-
-                           getLobErrStr(rc));
+                           getLobErrStr(rc), (char*)getSqlJniErrorStr());
                goto non_cli_error_return;
              }
            
@@ -9650,7 +9648,7 @@ Lng32 SQLCLI_LOBddlInterface
             ExRaiseSqlError(currContext.exHeap(), &da, 
                            (ExeErrorCode)(8442), NULL, &cliRC    , 
                            &rc, NULL, (char*)"ExpLOBInterfaceCreate",
-                           getLobErrStr(rc));
+                           getLobErrStr(rc), (char*)getSqlJniErrorStr());
             goto non_cli_error_return;
              
           }
@@ -9668,7 +9666,7 @@ Lng32 SQLCLI_LOBddlInterface
                ExRaiseSqlError(currContext.exHeap(), &da, 
                            (ExeErrorCode)(8442), NULL, &cliRC    , 
                            &rc, NULL, (char*)"ExpLOBInterfaceDrop  ",
-                           getLobErrStr(rc));
+                           getLobErrStr(rc), (char*)getSqlJniErrorStr());
                goto non_cli_error_return;
               }
           }//for
@@ -9700,7 +9698,7 @@ Lng32 SQLCLI_LOBddlInterface
             ExRaiseSqlError(currContext.exHeap(), &da, 
                            (ExeErrorCode)(8442), NULL, &cliRC    , 
                             &rc, NULL, (char*)"ExpLOBInterfaceCreate",
-                            getLobErrStr(rc));
+                           getLobErrStr(rc), (char*)getSqlJniErrorStr());
             goto non_cli_error_return;       
           }
        // drop descriptor table
@@ -9718,7 +9716,7 @@ Lng32 SQLCLI_LOBddlInterface
                ExRaiseSqlError(currContext.exHeap(), &da, 
                            (ExeErrorCode)(8442), NULL, &cliRC    , 
                            &rc, NULL, (char*)"ExpLOBInterfaceDrop  ",
-                           getLobErrStr(rc));
+                           getLobErrStr(rc), (char*)getSqlJniErrorStr());
                goto non_cli_error_return;
              }
            

http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/executor/ExExeUtilGet.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExExeUtilGet.cpp 
b/core/sql/executor/ExExeUtilGet.cpp
index 163b190..f4d0ade 100644
--- a/core/sql/executor/ExExeUtilGet.cpp
+++ b/core/sql/executor/ExExeUtilGet.cpp
@@ -7733,26 +7733,12 @@ else
     return rc;  
                 
   //EOD of LOB data file
-  
-  hdfsFS fs = 
currContext->getHdfsServerConnection((char*)getLItdb().getHdfsServer(),getLItdb().getHdfsPort());
-  if (fs == NULL)
-    return LOB_DATA_FILE_OPEN_ERROR;
-
-  
   snprintf(lobDataFilePath, LOBINFO_MAX_FILE_LEN, "%s/%s", lobLocation, 
lobDataFile);
-  hdfsFile fdData = hdfsOpenFile(fs, lobDataFilePath,O_RDONLY,0,0,0);
-  if (!fdData) 
-    {
-      hdfsCloseFile(fs,fdData);
-      fdData = NULL;
-      return LOB_DATA_FILE_OPEN_ERROR;
-    }
-  hdfsFileInfo *fInfo = hdfsGetPathInfo(fs, lobDataFilePath);
-  if (fInfo)
-    lobEOD = fInfo->mSize;
-  else
-    lobEOD = 0;
-  
+  HDFS_Client_RetCode hdfsClientRetcode;
+  lobEOD = HdfsClient::hdfsSize(lobDataFilePath, hdfsClientRetcode);
+  if (hdfsClientRetcode != HDFS_CLIENT_OK) 
+     return LOB_DATA_FILE_OPEN_ERROR;
+
   str_sprintf(buf, "  LOB EOD :  %ld", lobEOD);
   if (moveRowToUpQueue(buf, strlen(buf), &rc))
     return rc;
@@ -8034,24 +8020,12 @@ short ExExeUtilLobInfoTableTcb::collectLobInfo(char * 
tableName,Int32 currLobNum
       str_cpy_all(lobInfo_->lobDataFile,  lobDataFile,strlen(lobDataFile));
     }             
   //EOD of LOB data file
-  // hdfsFS fs = 
hdfsConnect(getLItdb().getHdfsServer(),getLItdb().getHdfsPort());
-  hdfsFS fs = 
currContext->getHdfsServerConnection((char*)getLItdb().getHdfsServer(),getLItdb().getHdfsPort());
-  if (fs == NULL)
-    return LOB_DATA_FILE_OPEN_ERROR;
-
   snprintf(lobDataFilePath, LOBINFO_MAX_FILE_LEN, "%s/%s", lobLocation, 
lobDataFile);
-  hdfsFile fdData = hdfsOpenFile(fs, lobDataFilePath,O_RDONLY,0,0,0);
-  if (!fdData) 
-    {
-      hdfsCloseFile(fs,fdData);
-      fdData = NULL;
-      return LOB_DATA_FILE_OPEN_ERROR;
-    }
-  hdfsFileInfo *fInfo = hdfsGetPathInfo(fs, lobDataFilePath);
-  if (fInfo)
-    lobEOD = fInfo->mSize;
-  else
-    lobEOD = 0;
+  HDFS_Client_RetCode hdfsClientRetcode;
+  lobEOD = HdfsClient::hdfsSize(lobDataFilePath, hdfsClientRetcode);
+  if (hdfsClientRetcode != HDFS_CLIENT_OK) 
+     return LOB_DATA_FILE_OPEN_ERROR;
+
   lobInfo_->lobDataFileSizeEod=lobEOD;
   // Sum of all the lobDescChunks for used space
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/executor/ExExeUtilLoad.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExExeUtilLoad.cpp 
b/core/sql/executor/ExExeUtilLoad.cpp
index 160a6bb..235293c 100644
--- a/core/sql/executor/ExExeUtilLoad.cpp
+++ b/core/sql/executor/ExExeUtilLoad.cpp
@@ -3052,7 +3052,7 @@ short ExExeUtilLobExtractTcb::work()
                    ExRaiseSqlError(getHeap(), &diagsArea_, 
                                    (ExeErrorCode)(8442), NULL, &intParam1, 
                                    &cliError, NULL, 
(char*)"ExpLOBInterfaceSelect",
-                                   getLobErrStr(intParam1));
+                                   getLobErrStr(intParam1), 
(char*)getSqlJniErrorStr());
                    step_ = HANDLE_ERROR_;
                    break;
                  }
@@ -3098,7 +3098,7 @@ short ExExeUtilLobExtractTcb::work()
                ExRaiseSqlError(getHeap(), &diagsArea_, 
                                (ExeErrorCode)(8442), NULL, &intParam1, 
                                &cliError, NULL, 
(char*)"ExpLOBInterfaceSelectCursor",
-                               getLobErrStr(intParam1));
+                               getLobErrStr(intParam1), 
(char*)getSqlJniErrorStr());
                step_ = HANDLE_ERROR_;
                break;
              }
@@ -3151,7 +3151,7 @@ short ExExeUtilLobExtractTcb::work()
                ExRaiseSqlError(getHeap(), &diagsArea_, 
                                (ExeErrorCode)(8442), NULL, &intParam1, 
                                &cliError, NULL, 
(char*)"ExpLOBInterfaceSelectCursor",
-                               getLobErrStr(intParam1));
+                               getLobErrStr(intParam1), 
(char*)getSqlJniErrorStr());
                step_ = HANDLE_ERROR_;
                break;
              }
@@ -3219,7 +3219,7 @@ short ExExeUtilLobExtractTcb::work()
                ExRaiseSqlError(getHeap(), &diagsArea_, 
                                (ExeErrorCode)(8442), NULL, &intParam1, 
                                &cliError, NULL, 
(char*)"ExpLOBInterfaceSelectCursor",
-                               getLobErrStr(intParam1));
+                               getLobErrStr(intParam1), 
(char*)getSqlJniErrorStr());
                step_ = HANDLE_ERROR_;
                break;
              } 
@@ -3507,7 +3507,7 @@ short ExExeUtilLobUpdateTcb::work()
                ExRaiseSqlError(getHeap(), &diagsArea_, 
                                (ExeErrorCode)(8442), NULL, &intParam1, 
                                &cliError, NULL, (char*)"ExpLOBInterfaceUpdate",
-                               getLobErrStr(intParam1));
+                               getLobErrStr(intParam1), 
(char*)getSqlJniErrorStr());
                step_ = HANDLE_ERROR_;
                break;
              }  
@@ -3590,7 +3590,7 @@ short ExExeUtilLobUpdateTcb::work()
                ExRaiseSqlError(getHeap(), &diagsArea_, 
                                (ExeErrorCode)(8442), NULL, &intParam1, 
                                &cliError, NULL, (char*)"ExpLOBInterfaceUpdate",
-                               getLobErrStr(intParam1));
+                               getLobErrStr(intParam1), 
(char*)getSqlJniErrorStr());
                step_ = HANDLE_ERROR_;
                break;
              }  
@@ -3675,7 +3675,7 @@ short ExExeUtilLobUpdateTcb::work()
                ExRaiseSqlError(getHeap(), &diagsArea_, 
                                (ExeErrorCode)(8442), NULL, &intParam1, 
                                &cliError, NULL, (char*)"ExpLOBInterfaceUpdate",
-                               getLobErrStr(intParam1));
+                               getLobErrStr(intParam1), 
(char*)getSqlJniErrorStr());
                step_ = HANDLE_ERROR_;
                break;
              }  
@@ -3848,7 +3848,7 @@ short ExExeUtilFileExtractTcb::work()
                ExRaiseSqlError(getHeap(), &diagsArea_, 
                                (ExeErrorCode)(8442), NULL, &intParam1, 
                                &cliError, NULL, 
(char*)"ExpLOBInterfaceSelectCursor/open",
-                               getLobErrStr(intParam1));
+                               getLobErrStr(intParam1), 
(char*)getSqlJniErrorStr());
                step_ = HANDLE_ERROR_;
                break;
              }
@@ -3894,7 +3894,7 @@ short ExExeUtilFileExtractTcb::work()
                ExRaiseSqlError(getHeap(), &diagsArea_, 
                                (ExeErrorCode)(8442), NULL, &intParam1, 
                                &cliError, NULL, 
(char*)"ExpLOBInterfaceSelectCursor/read",
-                               getLobErrStr(intParam1));
+                               getLobErrStr(intParam1), 
(char*)getSqlJniErrorStr());
                step_ = HANDLE_ERROR_;
                break;
              }
@@ -3949,7 +3949,7 @@ short ExExeUtilFileExtractTcb::work()
                ExRaiseSqlError(getHeap(), &diagsArea_, 
                                (ExeErrorCode)(8442), NULL, &intParam1, 
                                &cliError, NULL, 
(char*)"ExpLOBInterfaceSelectCursor/close",
-                               getLobErrStr(intParam1));
+                               getLobErrStr(intParam1), 
(char*)getSqlJniErrorStr());
                step_ = HANDLE_ERROR_;
                break;
              }
@@ -4084,7 +4084,7 @@ short ExExeUtilFileLoadTcb::work()
                    ExRaiseSqlError(getHeap(), &diagsArea_, 
                                    (ExeErrorCode)(8442), NULL, &intParam1, 
                                    &cliError, NULL, 
(char*)"ExpLOBInterfaceCreate",
-                                   getLobErrStr(intParam1));
+                                   getLobErrStr(intParam1), 
(char*)getSqlJniErrorStr());
                    step_ = HANDLE_ERROR_;
                    break;
                  }
@@ -4199,7 +4199,7 @@ short ExExeUtilFileLoadTcb::work()
                ExRaiseSqlError(getHeap(), &diagsArea_, 
                                (ExeErrorCode)(8442), NULL, &intParam1, 
                                &cliError, NULL, (char*)"ExpLOBInterfaceInsert",
-                               getLobErrStr(intParam1));
+                               getLobErrStr(intParam1), 
(char*)getSqlJniErrorStr());
                step_ = HANDLE_ERROR_;
                break;
              }
@@ -4229,7 +4229,7 @@ short ExExeUtilFileLoadTcb::work()
                ExRaiseSqlError(getHeap(), &diagsArea_, 
                                (ExeErrorCode)(8442), NULL, &intParam1, 
                                &cliError, NULL, 
(char*)"ExpLOBInterfaceCloseFile",
-                               getLobErrStr(intParam1));
+                               getLobErrStr(intParam1), 
(char*)getSqlJniErrorStr());
                step_ = HANDLE_ERROR_;
                break;
              }

http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/executor/HdfsClient_JNI.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.cpp 
b/core/sql/executor/HdfsClient_JNI.cpp
index 72157bf..930b7f4 100644
--- a/core/sql/executor/HdfsClient_JNI.cpp
+++ b/core/sql/executor/HdfsClient_JNI.cpp
@@ -332,6 +332,8 @@ static const char* const hdfsClientErrorEnumStr[] =
  ,"Java exception in HdfsClient::hdfsOpen()."
  ,"JNI NewStringUTF() in HdfsClient::hdfsWrite()."
  ,"Java exception in HdfsClient::hdfsWrite()."
+ ,"JNI NewStringUTF() in HdfsClient::hdfsWriteImmediate()."
+ ,"Java exception in HdfsClient::hdfsWriteImmediate()."
  ,"Error in HdfsClient::hdfsRead()."
  ,"Java exception in HdfsClient::hdfsRead()."
  ,"Java exception in HdfsClient::hdfsClose()."
@@ -353,6 +355,10 @@ static const char* const hdfsClientErrorEnumStr[] =
  ,"Buffer is small in HdfsClient::getFsDefaultName()."
  ,"Error in HdfsClient::hdfsCreateDirectory()."
  ,"Java exception in HdfsClient::hdfsCreateDirectory()."
+ ,"Error in HdfsClient::hdfsRename()."
+ ,"Java exception in HdfsClient::hdfsRename()."
+ ,"Error in HdfsClient::hdfsSize()."
+ ,"Java exception in HdfsClient::hdfsSize()."
 };
 
 //////////////////////////////////////////////////////////////////////////////
@@ -416,15 +422,10 @@ HdfsClient* HdfsClient::getInstance()
    return hdfsClient;
 }
 
-void HdfsClient::deleteInstance()
+void HdfsClient::deleteInstance(HdfsClient *hdfsClient)
 {
-   ContextCli *currContext = GetCliGlobals()->currContext();
-   HdfsClient *hdfsClient = currContext->getHDFSClient();
-   if (hdfsClient != NULL) {
-      NAHeap *heap = currContext->exHeap();
-      NADELETE(hdfsClient, HdfsClient, heap);
-      currContext->setHDFSClient(NULL);
-   }
+  hdfsClient->hdfsClose();
+  NADELETE(hdfsClient, HdfsClient, hdfsClient->getHeap());
 }
 
 HDFS_Client_RetCode HdfsClient::init()
@@ -452,8 +453,10 @@ HDFS_Client_RetCode HdfsClient::init()
     JavaMethods_[JM_HDFS_OPEN       ].jm_signature = "(Ljava/lang/String;Z)Z";
     JavaMethods_[JM_HDFS_WRITE      ].jm_name      = "hdfsWrite";
     JavaMethods_[JM_HDFS_WRITE      ].jm_signature = "([B)I";
+    JavaMethods_[JM_HDFS_WRITE_IMMEDIATE].jm_name      = "hdfsWriteImmediate";
+    JavaMethods_[JM_HDFS_WRITE_IMMEDIATE].jm_signature = "([B)J";
     JavaMethods_[JM_HDFS_READ       ].jm_name      = "hdfsRead";
-    JavaMethods_[JM_HDFS_READ       ].jm_signature = 
"(Ljava/nio/ByteBuffer;)I";
+    JavaMethods_[JM_HDFS_READ       ].jm_signature = 
"(JLjava/nio/ByteBuffer;)I";
     JavaMethods_[JM_HDFS_CLOSE      ].jm_name      = "hdfsClose";
     JavaMethods_[JM_HDFS_CLOSE      ].jm_signature = "()Z";
     JavaMethods_[JM_HDFS_MERGE_FILES].jm_name      = "hdfsMergeFiles";
@@ -472,6 +475,12 @@ HDFS_Client_RetCode HdfsClient::init()
     JavaMethods_[JM_GET_FS_DEFAULT_NAME].jm_signature = "()Ljava/lang/String;";
     JavaMethods_[JM_HDFS_CREATE_DIRECTORY].jm_name      = 
"hdfsCreateDirectory";
     JavaMethods_[JM_HDFS_CREATE_DIRECTORY].jm_signature = 
"(Ljava/lang/String;)Z";
+    JavaMethods_[JM_HDFS_RENAME].jm_name      = "hdfsRename";
+    JavaMethods_[JM_HDFS_RENAME].jm_signature = 
"(Ljava/lang/String;Ljava/lang/String;)Z";
+    JavaMethods_[JM_HDFS_SIZE].jm_name      = "hdfsSize";
+    JavaMethods_[JM_HDFS_SIZE].jm_signature = "()J";
+    JavaMethods_[JM_HDFS_SIZE_FOR_FILE].jm_name      = "hdfsSize";
+    JavaMethods_[JM_HDFS_SIZE_FOR_FILE].jm_signature = "(Ljava/lang/String;)J";
     rc = (HDFS_Client_RetCode)JavaObjectInterface::init(className, javaClass_, 
JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
     if (rc == HDFS_CLIENT_OK)
        javaMethodsInitialized_ = TRUE;
@@ -551,7 +560,7 @@ HDFS_Client_RetCode HdfsClient::hdfsOpen(const char* path, 
NABoolean compress)
 
   if (initJNIEnv() != JOI_OK)
      return HDFS_CLIENT_ERROR_HDFS_OPEN_PARAM;
-
+  setPath(path);
   jstring js_path = jenv_->NewStringUTF(path);
   if (js_path == NULL) {
     
GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_OPEN_PARAM));
@@ -587,8 +596,37 @@ HDFS_Client_RetCode HdfsClient::hdfsOpen(const char* path, 
NABoolean compress)
   return HDFS_CLIENT_OK;
 }
 
+Int64 HdfsClient::hdfsSize(HDFS_Client_RetCode &hdfsClientRetcode)
+{
+  QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsSize() called.");
+   
+  if (initJNIEnv() != JOI_OK) {
+     hdfsClientRetcode = HDFS_CLIENT_ERROR_SIZE_PARAM;
+     return -1;
+  }
 
-Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode 
&hdfsClientRetcode)
+  if (hdfsStats_ != NULL)
+     hdfsStats_->getHdfsTimer().start();
+  tsRecentJMFromJNI = JavaMethods_[JM_HDFS_SIZE].jm_full_name;
+  jlong jresult = jenv_->CallLongMethod(javaObj_, 
JavaMethods_[JM_HDFS_SIZE].methodID);
+  if (hdfsStats_ != NULL) {
+      hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
+      hdfsStats_->incHdfsCalls();
+  }
+
+  if (jenv_->ExceptionCheck())
+  {
+    getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsSize()");
+    jenv_->PopLocalFrame(NULL);
+    hdfsClientRetcode = HDFS_CLIENT_ERROR_SIZE_EXCEPTION;
+    return -1;
+  }
+  hdfsClientRetcode = HDFS_CLIENT_OK;
+  return jresult;
+}
+
+
+Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode 
&hdfsClientRetcode, int maxChunkSize)
 {
   QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsWrite(%ld) called.", 
len);
 
@@ -598,8 +636,9 @@ Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, 
HDFS_Client_RetCode &hd
   }
   Int64 lenRemain = len;
   Int64 writeLen;
-  Int64 chunkLen = (ioByteArraySizeInKB_ > 0 ? ioByteArraySizeInKB_ * 1024 : 
0);
+  Int64 chunkLen = (maxChunkSize > 0 ? maxChunkSize : (ioByteArraySizeInKB_ > 
0 ? ioByteArraySizeInKB_ * 1024 : 0));
   Int64 offset = 0;
+  jint bytesWritten;
   do 
   {
      if ((chunkLen > 0) && (lenRemain > chunkLen))
@@ -621,7 +660,7 @@ Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, 
HDFS_Client_RetCode &hd
 
      tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE].jm_full_name;
      // Java method returns the cumulative bytes written
-     jint totalBytesWritten = jenv_->CallIntMethod(javaObj_, 
JavaMethods_[JM_HDFS_WRITE].methodID, jbArray);
+     bytesWritten = jenv_->CallIntMethod(javaObj_, 
JavaMethods_[JM_HDFS_WRITE].methodID, jbArray);
 
      if (hdfsStats_ != NULL) {
          hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
@@ -639,12 +678,70 @@ Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, 
HDFS_Client_RetCode &hd
   } while (lenRemain > 0);
   jenv_->PopLocalFrame(NULL);
   hdfsClientRetcode = HDFS_CLIENT_OK;
-  return len; 
+  return bytesWritten; 
 }
 
-Int32 HdfsClient::hdfsRead(const char* data, Int64 len, HDFS_Client_RetCode 
&hdfsClientRetcode)
+Int64 HdfsClient::hdfsWriteImmediate(const char* data, Int64 len, 
HDFS_Client_RetCode &hdfsClientRetcode, int maxChunkSize)
 {
-   QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsWrite(%ld) called.", 
len);
+  QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsWriteImmediate(%ld) 
called.", len);
+
+  if (initJNIEnv() != JOI_OK) {
+     hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_EXCEPTION;
+     return 0;
+  }
+  Int64 lenRemain = len;
+  Int64 writeLen;
+  Int64 chunkLen = (maxChunkSize > 0 ? maxChunkSize : (ioByteArraySizeInKB_ > 
0 ? ioByteArraySizeInKB_ * 1024 : 0));
+  Int64 offset = 0;
+  jlong writeOffset = -1;
+  jlong chunkWriteOffset;
+  do 
+  {
+     if ((chunkLen > 0) && (lenRemain > chunkLen))
+        writeLen = chunkLen; 
+     else
+        writeLen = lenRemain;
+     //Write the requisite bytes into the file
+     jbyteArray jbArray = jenv_->NewByteArray(writeLen);
+     if (!jbArray) {
+        
GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_PARAM));
+        jenv_->PopLocalFrame(NULL);
+        hdfsClientRetcode =  HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_PARAM;
+        return 0;
+     }
+     jenv_->SetByteArrayRegion(jbArray, 0, writeLen, (const 
jbyte*)(data+offset));
+
+     if (hdfsStats_ != NULL)
+         hdfsStats_->getHdfsTimer().start();
+
+     tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE_IMMEDIATE].jm_full_name;
+     // Java method returns the cumulative bytes written
+     chunkWriteOffset = jenv_->CallIntMethod(javaObj_, 
JavaMethods_[JM_HDFS_WRITE_IMMEDIATE].methodID, jbArray);
+     if (writeOffset == -1)
+        writeOffset = chunkWriteOffset;
+
+     if (hdfsStats_ != NULL) {
+         hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
+         hdfsStats_->incHdfsCalls();
+     }
+     if (jenv_->ExceptionCheck())
+     {
+        getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsWrite()");
+        jenv_->PopLocalFrame(NULL);
+        hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_EXCEPTION;
+        return 0;
+     }
+     lenRemain -= writeLen;
+     offset += writeLen;
+  } while (lenRemain > 0);
+  jenv_->PopLocalFrame(NULL);
+  hdfsClientRetcode = HDFS_CLIENT_OK;
+  return writeOffset;
+}
+
+Int32 HdfsClient::hdfsRead(Int64 pos, const char* data, Int64 len, 
HDFS_Client_RetCode &hdfsClientRetcode)
+{
+   QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsRead(%ld) called.", 
len);
 
    if (initJNIEnv() != JOI_OK) {
       hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_READ_EXCEPTION;
@@ -661,7 +758,8 @@ Int32 HdfsClient::hdfsRead(const char* data, Int64 len, 
HDFS_Client_RetCode &hdf
 
   tsRecentJMFromJNI = JavaMethods_[JM_HDFS_READ].jm_full_name;
   jint bytesRead = 0;
-  bytesRead = jenv_->CallIntMethod(javaObj_, 
JavaMethods_[JM_HDFS_READ].methodID, j_buf);
+  jlong j_pos = pos;
+  bytesRead = jenv_->CallIntMethod(javaObj_, 
JavaMethods_[JM_HDFS_READ].methodID, j_pos, j_buf);
 
   if (hdfsStats_ != NULL) {
       hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
@@ -714,6 +812,38 @@ HDFS_Client_RetCode HdfsClient::hdfsClose()
   return HDFS_CLIENT_OK;
 }
 
+Int64 HdfsClient::hdfsSize(const char *filename, HDFS_Client_RetCode 
&hdfsClientRetcode)
+{
+  QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsSize() called.");
+   
+  if (initJNIEnv() != JOI_OK) {
+     hdfsClientRetcode = HDFS_CLIENT_ERROR_SIZE_PARAM;
+     return -1;
+  }
+  if (getInstance() == NULL)
+     return HDFS_CLIENT_ERROR_SIZE_PARAM;
+
+  jstring j_filename = jenv_->NewStringUTF(filename);
+  if (j_filename == NULL) {
+    
GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_SIZE_PARAM));
+    jenv_->PopLocalFrame(NULL);
+    return HDFS_CLIENT_ERROR_SIZE_PARAM;
+  }
+
+  tsRecentJMFromJNI = JavaMethods_[JM_HDFS_SIZE_FOR_FILE].jm_full_name;
+  jlong jresult = jenv_->CallStaticLongMethod(javaClass_, 
JavaMethods_[JM_HDFS_SIZE_FOR_FILE].methodID, j_filename);
+
+  if (jenv_->ExceptionCheck())
+  {
+    getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsSize()");
+    jenv_->PopLocalFrame(NULL);
+    hdfsClientRetcode = HDFS_CLIENT_ERROR_SIZE_EXCEPTION;
+    return -1;
+  }
+  hdfsClientRetcode = HDFS_CLIENT_OK;
+  return jresult;
+}
+
 HDFS_Client_RetCode HdfsClient::hdfsCleanUnloadPath( const NAString& uldPath)
 {
   QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsCleanUnloadPath(%s) 
called.",
@@ -978,7 +1108,47 @@ HDFS_Client_RetCode HdfsClient::hdfsCreateDirectory(const 
NAString &dirName)
   {
     logError(CAT_SQL_HDFS, "HdfsClient::hdfsCreateDirectory()", 
getLastError());
     jenv_->PopLocalFrame(NULL);
-    return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_EXCEPTION;
+    return HDFS_CLIENT_ERROR_CREATE_DIRECTORY_EXCEPTION;
+  }
+  jenv_->PopLocalFrame(NULL);
+  return HDFS_CLIENT_OK;
+}
+
+HDFS_Client_RetCode HdfsClient::hdfsRename(const NAString &fromPath, const 
NAString &toPath)
+{
+  QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "Enter HDFSClient_JNI::hdfsRename() 
called.");
+  if (initJNIEnv() != JOI_OK)
+     return HDFS_CLIENT_ERROR_RENAME_PARAM;
+  if (getInstance() == NULL)
+     return HDFS_CLIENT_ERROR_RENAME_PARAM;
+
+  jstring js_fromPath = jenv_->NewStringUTF(fromPath.data());
+  if (js_fromPath == NULL) {
+     jenv_->PopLocalFrame(NULL);
+     return HDFS_CLIENT_ERROR_RENAME_PARAM;
+  }
+
+  jstring js_toPath = jenv_->NewStringUTF(toPath.data());
+  if (js_toPath == NULL) {
+     jenv_->PopLocalFrame(NULL);
+     return HDFS_CLIENT_ERROR_RENAME_PARAM;
+  }
+
+  tsRecentJMFromJNI = JavaMethods_[JM_HDFS_RENAME].jm_full_name;
+  jstring jresult = 
+        (jstring)jenv_->CallStaticObjectMethod(javaClass_,
+                              JavaMethods_[JM_HDFS_RENAME].methodID, 
js_fromPath, js_toPath);
+  if (jenv_->ExceptionCheck())
+  {
+    getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsRename()");
+    jenv_->PopLocalFrame(NULL);
+    return HDFS_CLIENT_ERROR_RENAME_EXCEPTION;
+  }
+  if (jresult == false)
+  {
+    logError(CAT_SQL_HDFS, "HdfsClient::hdfsRename()", getLastError());
+    jenv_->PopLocalFrame(NULL);
+    return HDFS_CLIENT_ERROR_RENAME_EXCEPTION;
   }
   jenv_->PopLocalFrame(NULL);
   return HDFS_CLIENT_OK;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/executor/HdfsClient_JNI.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.h 
b/core/sql/executor/HdfsClient_JNI.h
index 888451c..b4ef741 100644
--- a/core/sql/executor/HdfsClient_JNI.h
+++ b/core/sql/executor/HdfsClient_JNI.h
@@ -129,6 +129,8 @@ typedef enum {
  ,HDFS_CLIENT_ERROR_HDFS_OPEN_EXCEPTION
  ,HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM
  ,HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION
+ ,HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_PARAM
+ ,HDFS_CLIENT_ERROR_HDFS_WRITE_IMMEDIATE_EXCEPTION
  ,HDFS_CLIENT_ERROR_HDFS_READ_PARAM
  ,HDFS_CLIENT_ERROR_HDFS_READ_EXCEPTION
  ,HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION
@@ -151,6 +153,10 @@ typedef enum {
  ,HDFS_CLIENT_ERROR_GET_FS_DEFAULT_NAME_BUFFER_TOO_SMALL
  ,HDFS_CLIENT_ERROR_CREATE_DIRECTORY_PARAM
  ,HDFS_CLIENT_ERROR_CREATE_DIRECTORY_EXCEPTION
+ ,HDFS_CLIENT_ERROR_RENAME_PARAM
+ ,HDFS_CLIENT_ERROR_RENAME_EXCEPTION
+ ,HDFS_CLIENT_ERROR_SIZE_PARAM
+ ,HDFS_CLIENT_ERROR_SIZE_EXCEPTION
  ,HDFS_CLIENT_LAST
 } HDFS_Client_RetCode;
 
@@ -171,7 +177,7 @@ public:
   ~HdfsClient();
   static HdfsClient *newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, 
HDFS_Client_RetCode &retCode, int hdfsIoByteArraySizeInKB = 0);
   static HdfsClient *getInstance();
-  static void deleteInstance();
+  static void deleteInstance(HdfsClient *hdfsClient);
   void setIoByteArraySize(int size)
       { ioByteArraySizeInKB_ = size; }
 
@@ -182,13 +188,16 @@ public:
   HDFS_Client_RetCode    init();
   HDFS_Client_RetCode    hdfsCreate(const char* path, NABoolean overwrite, 
NABoolean compress);
   HDFS_Client_RetCode    hdfsOpen(const char* path, NABoolean compress);
-  Int32                  hdfsWrite(const char* data, Int64 size, 
HDFS_Client_RetCode &hdfsClientRetcode);
-  Int32                  hdfsRead(const char* data, Int64 size, 
HDFS_Client_RetCode &hdfsClientRetcode);
+  Int64                  hdfsSize(HDFS_Client_RetCode &hdfsClientRetcode);
+  Int32                  hdfsWrite(const char* data, Int64 size, 
HDFS_Client_RetCode &hdfsClientRetcode, int maxChunkSize = 0);
+  Int64                  hdfsWriteImmediate(const char* data, Int64 size, 
HDFS_Client_RetCode &hdfsClientRetcode, int maxChunkSize = 0);
+  Int32                  hdfsRead(Int64 offset, const char* data, Int64 size, 
HDFS_Client_RetCode &hdfsClientRetcode);
   HDFS_Client_RetCode    hdfsClose();
   HDFS_Client_RetCode    setHdfsFileInfo(JNIEnv *jenv, jint numFiles, jint 
fileNo, jboolean isDir, 
           jstring filename, jlong modTime, jlong len, jshort numReplicas, 
jlong blockSize, 
           jstring owner, jstring group, jshort permissions, jlong accessTime);
   HDFS_Client_RetCode    hdfsListDirectory(const char *pathStr, HDFS_FileInfo 
**hdfsFileInfo, int *numFiles);
+  static Int64                  hdfsSize(const char *filename, 
HDFS_Client_RetCode &hdfsClientRetcode);
   static HDFS_Client_RetCode    hdfsMergeFiles(const NAString& srcPath, const 
NAString& dstPath);
   static HDFS_Client_RetCode    hdfsCleanUnloadPath(const NAString& uldPath );
   static HDFS_Client_RetCode    hdfsExists(const NAString& uldPath,  NABoolean 
& exists );
@@ -199,6 +208,7 @@ public:
   // buf_len is the length of the buffer in bytes
   static HDFS_Client_RetCode    getFsDefaultName(char* buffer, Int32 buf_len);
   static HDFS_Client_RetCode    hdfsCreateDirectory(const NAString& path);
+  static HDFS_Client_RetCode    hdfsRename(const NAString& fromPath, const 
NAString& toPath);
 
 private:  
   enum JAVA_METHODS {
@@ -206,6 +216,7 @@ private:
     JM_HDFS_CREATE,
     JM_HDFS_OPEN,
     JM_HDFS_WRITE,
+    JM_HDFS_WRITE_IMMEDIATE,
     JM_HDFS_READ,
     JM_HDFS_CLOSE,
     JM_HDFS_MERGE_FILES,
@@ -216,6 +227,9 @@ private:
     JM_HIVE_TBL_MAX_MODIFICATION_TS,
     JM_GET_FS_DEFAULT_NAME,
     JM_HDFS_CREATE_DIRECTORY,
+    JM_HDFS_RENAME,
+    JM_HDFS_SIZE,
+    JM_HDFS_SIZE_FOR_FILE,
     JM_LAST
   };
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/executor/ex_globals.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ex_globals.cpp b/core/sql/executor/ex_globals.cpp
index df7e9ef..df2ca67 100644
--- a/core/sql/executor/ex_globals.cpp
+++ b/core/sql/executor/ex_globals.cpp
@@ -135,7 +135,7 @@ void ex_globals::deleteMe(NABoolean fatalError)
   statsArea_ = NULL;
   cleanupTcbs();
   tcbList_.deallocate();
-  ExpLOBinterfaceCleanup(exLobGlobals_);
+  NADELETE(exLobGlobals_, ExLobGlobals, exLobGlobals_->getHeap());
   exLobGlobals_ = NULL;
 
 }

http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/exp/ExpLOB.cpp
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpLOB.cpp b/core/sql/exp/ExpLOB.cpp
index 10b2bc1..fe09b43 100644
--- a/core/sql/exp/ExpLOB.cpp
+++ b/core/sql/exp/ExpLOB.cpp
@@ -860,7 +860,7 @@ else
       ExRaiseSqlError(h, diagsArea, 
                      (ExeErrorCode)(8442), NULL, &intParam1, 
                      &cliError, NULL, (char*)"ExpLOBInterfaceInsert",
-                     (char*)"ExpLOBInterfaceInsert",getLobErrStr(intParam1));
+                     getLobErrStr(intParam1), (char*)getSqlJniErrorStr());
       return ex_expr::EXPR_ERROR;
     }
 
@@ -1010,7 +1010,7 @@ ex_expr::exp_return_type ExpLOBiud::insertData(Lng32 
handleLen,
       ExRaiseSqlError(h, diagsArea, 
                      (ExeErrorCode)(8442), NULL, &intParam1, 
                      &cliError, NULL, (char*)"ExpLOBInterfaceInsert",
-                     (char*)"ExpLOBInterfaceInsert",getLobErrStr(intParam1));
+                     getLobErrStr(intParam1), (char*)getSqlJniErrorStr());
       return ex_expr::EXPR_ERROR;
     }
 
@@ -1166,7 +1166,7 @@ ex_expr::exp_return_type ExpLOBdelete::eval(char 
*op_data[],
       ExRaiseSqlError(h, diagsArea, 
                      (ExeErrorCode)(8442), NULL, &intParam1, 
                      &cliError, NULL, (char*)"ExpLOBInterfaceDelete",
-                     (char*)"ExpLOBInterfaceDelete",getLobErrStr(intParam1));
+                     getLobErrStr(intParam1), (char*)getSqlJniErrorStr());
       return ex_expr::EXPR_ERROR;
     }
 
@@ -1472,7 +1472,7 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char 
*op_data[],
       ExRaiseSqlError(h, diagsArea, 
                      (ExeErrorCode)(8442), NULL, &intParam1, 
                      &cliError, NULL, (char*)"ExpLOBInterfaceUpdate",
-                     (char*)"ExpLOBInterfaceUpdate",getLobErrStr(intParam1));
+                     getLobErrStr(intParam1), (char*)getSqlJniErrorStr());
       return ex_expr::EXPR_ERROR;
      }
 
@@ -1611,7 +1611,7 @@ ex_expr::exp_return_type ExpLOBconvert::eval(char 
*op_data[],
        ExRaiseSqlError(h, diagsArea, 
                          (ExeErrorCode)(8442), NULL, &intParam1, 
                          &cliError, NULL, (char*)"ExpLOBInterfaceSelect",
-                         
(char*)"ExpLOBInterfaceSelect",getLobErrStr(intParam1));
+                         getLobErrStr(intParam1), (char*)getSqlJniErrorStr());
          return ex_expr::EXPR_ERROR;
     }
   if(toFile())
@@ -1669,7 +1669,7 @@ ex_expr::exp_return_type ExpLOBconvert::eval(char 
*op_data[],
          ExRaiseSqlError(h, diagsArea, 
                          (ExeErrorCode)(8442), NULL, &intParam1, 
                          &cliError, NULL, (char*)"ExpLOBInterfaceSelect",
-                         
(char*)"ExpLOBInterfaceSelect",getLobErrStr(intParam1));
+                         getLobErrStr(intParam1), (char*)getSqlJniErrorStr());
          return ex_expr::EXPR_ERROR;
        }
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/exp/ExpLOBaccess.cpp
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpLOBaccess.cpp b/core/sql/exp/ExpLOBaccess.cpp
index a804959..3b4cd93 100644
--- a/core/sql/exp/ExpLOBaccess.cpp
+++ b/core/sql/exp/ExpLOBaccess.cpp
@@ -85,7 +85,9 @@ ExLob::ExLob(NAHeap * heap) :
     fs_(NULL),
     fdData_(NULL),
     openFlags_(0),
-    lobTrace_(FALSE)
+    lobTrace_(FALSE),
+    useLibHdfs_(FALSE),
+    hdfsClient_(NULL)    
 {
   // nothing else to do
 }
@@ -97,6 +99,10 @@ ExLob::~ExLob()
       hdfsCloseFile(fs_, fdData_);
       fdData_ = NULL;
     }
+    if (hdfsClient_ != NULL) {
+       HdfsClient::deleteInstance(hdfsClient_);
+       hdfsClient_ = NULL;
+    }
    
 }
 
@@ -113,6 +119,7 @@ Ex_Lob_Error ExLob::initialize(const char *lobFile, 
Ex_Lob_Mode mode,
   struct timespec endTime;
   Int64 secs, nsecs, totalnsecs;
  
+  useLibHdfs_ = lobGlobals->useLibHdfs_;
   if (lobStorageLocation) 
     {
       if (lobStorageLocation_.empty()) 
@@ -148,12 +155,21 @@ Ex_Lob_Error ExLob::initialize(const char *lobFile, 
Ex_Lob_Mode mode,
   hdfsPort_ = hdfsPort;
   // lobLocation_ = lobLocation;
   clock_gettime(CLOCK_MONOTONIC, &startTime);
-  
-  if (lobGlobals->getHdfsFs() == NULL)     
-    return LOB_HDFS_CONNECT_ERROR;
-  else 
-    fs_ = lobGlobals->getHdfsFs();
-    
+  lobGlobalHeap_ = lobGlobals->getHeap();    
+  HDFS_Client_RetCode hdfsClientRetcode;
+  if (useLibHdfs_) { 
+     if (lobGlobals->getHdfsFs() == NULL)     
+        return LOB_HDFS_CONNECT_ERROR;
+     else 
+        fs_ = lobGlobals->getHdfsFs();
+     hdfsClient_ = NULL;
+  }
+  else {
+     hdfsClient_ = HdfsClient::newInstance(lobGlobalHeap_, NULL, 
hdfsClientRetcode);
+     fs_ = NULL;
+     if (hdfsClient_ == NULL)
+        return LOB_HDFS_CONNECT_ERROR;
+  }
 
   clock_gettime(CLOCK_MONOTONIC, &endTime);
 
@@ -166,7 +182,22 @@ Ex_Lob_Error ExLob::initialize(const char *lobFile, 
Ex_Lob_Mode mode,
     }
   totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs;
   stats_.hdfsConnectionTime += totalnsecs;
-    
+   
+  if (! useLibHdfs_) {
+     if (mode == EX_LOB_CREATE) {
+        hdfsClientRetcode = hdfsClient_->hdfsCreate(lobDataFile_.data(), 
FALSE, FALSE); 
+        if (hdfsClientRetcode != HDFS_CLIENT_OK)
+            return LOB_DATA_FILE_CREATE_ERROR;
+     }
+     else {
+        hdfsClientRetcode = hdfsClient_->hdfsOpen(lobDataFile_.data(), FALSE);
+        if (hdfsClientRetcode != HDFS_CLIENT_OK)
+            return LOB_DATA_FILE_OPEN_ERROR;
+     }
+     fdData_ = NULL;
+  }
+  else
+  {
   if (mode == EX_LOB_CREATE) 
     { 
       // check if file is already created
@@ -186,7 +217,7 @@ Ex_Lob_Error ExLob::initialize(const char *lobFile, 
Ex_Lob_Mode mode,
       fdData_ = NULL;
      
     }
-  lobGlobalHeap_ = lobGlobals->getHeap();    
+  }
   return LOB_OPER_OK;
     
 }
@@ -297,6 +328,16 @@ Ex_Lob_Error ExLob::getDesc(ExLobDesc &desc,char * 
handleIn, Int32 handleInLen,
 Ex_Lob_Error ExLob::writeData(Int64 offset, char *data, Int32 size, Int64 
&operLen)
 { 
     Ex_Lob_Error err;
+    HDFS_Client_RetCode hdfsClientRetcode = HDFS_CLIENT_OK;
+    Int64 writeOffset;
+
+    if (! useLibHdfs_ ) {
+       writeOffset = hdfsClient_->hdfsWriteImmediate(data, size, 
hdfsClientRetcode); 
+       if (hdfsClientRetcode != HDFS_CLIENT_OK)
+          return LOB_DATA_WRITE_ERROR;
+       operLen = size;
+       return LOB_OPER_OK;
+    }
     lobDebugInfo("In ExLob::writeData",0,__LINE__,lobTrace_);
     if (!fdData_ || (openFlags_ != (O_WRONLY | O_APPEND))) // file is not open 
for write
     {
@@ -329,7 +370,8 @@ Ex_Lob_Error ExLob::writeDataSimple(char *data, Int64 size, 
LobsSubOper subOpera
                                     int bufferSize , short replication , int 
blockSize)
 { 
     Ex_Lob_Error err;
-
+    if (! useLibHdfs_)
+       return writeData(0,data, size, operLen);
 
     if (!fdData_ || (openFlags_ != (O_WRONLY | O_APPEND))) // file is not open 
for write
     {
@@ -424,7 +466,14 @@ Ex_Lob_Error ExLob::emptyDirectory(char *dirPath,
                                    ExLobGlobals *lobGlobals)
 {
   int retcode = 0;
-
+  HDFS_Client_RetCode hdfsClientRetcode;
+  if (! useLibHdfs_) {
+     hdfsClientRetcode = HdfsClient::hdfsDeletePath(dirPath);
+     if (hdfsClientRetcode != HDFS_CLIENT_OK)
+        return LOB_DATA_FILE_DELETE_ERROR;
+     return LOB_OPER_OK;
+  }
+      
   hdfsFileInfo *fileInfos = hdfsGetPathInfo(fs_, dirPath);
   if (fileInfos == NULL)
     {
@@ -521,8 +570,16 @@ Ex_Lob_Error ExLob::statSourceFile(char *srcfile, Int64 
&sourceEOF)
    lobDebugInfo("In ExLob::statSourceFile",0,__LINE__,lobTrace_);
    // check if the source file is a hdfs file or from local file system.
   LobInputOutputFileType srcType = fileType(srcfile);
+   HDFS_Client_RetCode hdfsClientRetcode;
    if (srcType == HDFS_FILE)
      {
+       if (! useLibHdfs_) {
+          sourceEOF = HdfsClient::hdfsSize(srcfile, hdfsClientRetcode);
+          if (hdfsClientRetcode != HDFS_CLIENT_OK)
+             return LOB_SOURCE_FILE_OPEN_ERROR;
+          ex_assert(sourceEOF >= 0, "Offset is -1 possibly due to path being 
directory");
+       }
+       else {
        hdfsFile sourceFile = hdfsOpenFile(fs_,srcfile,O_RDONLY,0,0,0);   
        if (!sourceFile)                   
          return LOB_SOURCE_FILE_OPEN_ERROR;
@@ -536,8 +593,8 @@ Ex_Lob_Error ExLob::statSourceFile(char *srcfile, Int64 
&sourceEOF)
        
        str_sprintf(logBuf,"Returning EOF of %ld for file %s", 
sourceEOF,srcfile);
        lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
+       }
      }
-
    else if (srcType == LOCAL_FILE)
      {
        int openFlags = O_RDONLY;
@@ -554,11 +611,7 @@ Ex_Lob_Error ExLob::statSourceFile(char *srcfile, Int64 
&sourceEOF)
        if (stat(srcfile, &statbuf) != 0) {
         return LOB_SOURCE_FILE_STAT_ERROR;
        }
-
        sourceEOF = statbuf.st_size;
-
-       
-
        flock(fdSrcFile, LOCK_UN);
        close(fdSrcFile);
        str_sprintf(logBuf,"Returning EOF of %ld for file %s", 
sourceEOF,srcfile);
@@ -638,8 +691,28 @@ Ex_Lob_Error ExLob::readHdfsSourceFile(char *srcfile, char 
*&fileData, Int32 &si
      char logBuf[4096];
      str_sprintf(logBuf,"Calling ::readHdfsSourceFile: %s Offset:%ld, Size: 
%d",srcfile, offset,size);
      lobDebugInfo(logBuf, 0,__LINE__,lobTrace_);
-  
-   
+     HDFS_Client_RetCode hdfsClientRetcode; 
+     Int64 bytesRead;
+     if (!useLibHdfs_) {
+        HdfsClient *srcHdfsClient = 
HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode);
+        ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: 
HdfsClient::newInstance returned an error");
+        hdfsClientRetcode  = srcHdfsClient->hdfsOpen(srcfile, FALSE);
+        if (hdfsClientRetcode != HDFS_CLIENT_OK)
+           return LOB_SOURCE_FILE_OPEN_ERROR;
+        fileData = (char *) (getLobGlobalHeap())->allocateMemory(size);
+        if (fileData == (char *)-1) 
+           return LOB_SOURCE_DATA_ALLOC_ERROR;
+        bytesRead = srcHdfsClient->hdfsRead(offset, fileData, size, 
hdfsClientRetcode);
+        if (hdfsClientRetcode != HDFS_CLIENT_OK) {
+           HdfsClient::deleteInstance(srcHdfsClient);
+           getLobGlobalHeap()->deallocateMemory(fileData);
+           return LOB_SOURCE_FILE_READ_ERROR;
+        }  
+        size = bytesRead;
+        // Memory growth/leak
+        HdfsClient::deleteInstance(srcHdfsClient);
+        return LOB_OPER_OK;
+     }
      int openFlags = O_RDONLY;
      hdfsFile fdSrcFile = hdfsOpenFile(fs_,srcfile, openFlags,0,0,0);
      if (fdSrcFile == NULL) 
@@ -662,6 +735,7 @@ Ex_Lob_Error ExLob::readHdfsSourceFile(char *srcfile, char 
*&fileData, Int32 &si
      
      return LOB_OPER_OK;
  }
+
 Ex_Lob_Error ExLob::readLocalSourceFile(char *srcfile, char *&fileData, Int32 
&size, Int64 offset)
    {  
      char logBuf[4096];
@@ -952,11 +1026,14 @@ Ex_Lob_Error ExLob::writeLobData(char *source, Int64 
sourceLen, LobsSubOper subO
     Ex_Lob_Error err=LOB_OPER_OK; 
     char logBuf[4096];
     lobDebugInfo("In ExLob::writeLobData",0,__LINE__,lobTrace_);
+    HDFS_Client_RetCode hdfsClientRetcode = HDFS_CLIENT_OK;
+    Int64 writeOffset;
+
     char *inputAddr = source;
     Int64 readOffset = 0;
     Int32 allocMemSize = 0;
     Int64 inputSize = sourceLen;
-    Int64 writeOffset = tgtOffset;
+    writeOffset = tgtOffset;
     if (subOperation == Lob_External_File)
       return LOB_OPER_OK;
     while(inputSize > 0)
@@ -1003,8 +1080,10 @@ Ex_Lob_Error ExLob::writeLobData(char *source, Int64 
sourceLen, LobsSubOper subO
          }
       }
     lobDebugInfo("Leaving ExLob::writeLobData",0,__LINE__,lobTrace_);  
-    hdfsCloseFile(fs_, fdData_);
-    fdData_=NULL;
+    if (useLibHdfs_) {
+       hdfsCloseFile(fs_, fdData_);
+       fdData_=NULL;
+    }
     return err;
 }
 
@@ -1439,6 +1518,13 @@ Ex_Lob_Error ExLob::delDesc(char *handleIn, Int32 
handleInLen, Int64 transId)
 Ex_Lob_Error ExLob::purgeLob()
 {
     char logBuf[4096];
+    if (! useLibHdfs_) {
+       HDFS_Client_RetCode hdfsClientRetcode;
+       hdfsClientRetcode = HdfsClient::hdfsDeletePath(lobDataFile_.data());
+       if (hdfsClientRetcode != HDFS_CLIENT_OK)
+         return LOB_DATA_FILE_DELETE_ERROR;
+       return LOB_OPER_OK;
+    }
      if (hdfsDelete(fs_, lobDataFile_.data(), 0) != 0)
        {
          // extract a substring small enough to fit into logBuf
@@ -1693,6 +1779,26 @@ Ex_Lob_Error ExLob::allocateDesc(ULng32 size, Int64 
&descNum, Int64 &dataOffset,
     char logBuf[4096];
     lobDebugInfo("In ExLob::allocateDesc",0,__LINE__,lobTrace_);
     Int32 openFlags = O_RDONLY ;   
+    HDFS_Client_RetCode hdfsClientRetcode;
+
+    if (! useLibHdfs_) {
+       if (size == 0) {
+          hdfsClientRetcode = HdfsClient::hdfsDeletePath(lobDataFile_.data());
+          if (hdfsClientRetcode != HDFS_CLIENT_OK)
+             return LOB_DATA_FILE_WRITE_ERROR;
+          else {
+             dataOffset = 0;
+             return LOB_OPER_OK; 
+          }
+       }
+       else {
+         dataOffset = hdfsClient_->hdfsSize(hdfsClientRetcode); 
+         if (hdfsClientRetcode != HDFS_CLIENT_OK)
+            return LOB_DATA_FILE_WRITE_ERROR;
+         ex_assert(dataOffset >= 0, "Offset is -1 possibly due to path being 
directory");
+         return  LOB_OPER_OK;
+       }
+    }
     if (size == 0) //we are trying to empty this lob.
       {
         //rename lob datafile
@@ -1793,12 +1899,13 @@ Ex_Lob_Error ExLob::allocateDesc(ULng32 size, Int64 
&descNum, Int64 &dataOffset,
      
       return LOB_OPER_OK;    
 }
+
 Ex_Lob_Error ExLob::compactLobDataFile(ExLobInMemoryDescChunksEntry 
*dcArray,Int32 numEntries)
 {
   Ex_Lob_Error rc = LOB_OPER_OK;
   char logBuf[4096];
   lobDebugInfo("In ExLob::compactLobDataFile",0,__LINE__,lobTrace_);
-  Int64 maxMemChunk = 100*1024*1024; //100 MB limit for intermediate buffer 
for transfering data
+  Int64 maxMemChunk = 64*1024*1024; //64 MB limit for intermediate buffer for 
transfering data
 
   // make some temporary file names
   size_t len = lobDataFile_.length();
@@ -1827,14 +1934,15 @@ Ex_Lob_Error 
ExLob::compactLobDataFile(ExLobInMemoryDescChunksEntry *dcArray,Int
               lobDataFileSubstr,tmpLobDataFileSubstr, saveLobDataFileSubstr);
 
   lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
-  hdfsFS fs = hdfsConnect(hdfsServer_,hdfsPort_);
-  if (fs == NULL)
-    return LOB_DATA_FILE_OPEN_ERROR;
-  
- 
-  hdfsFile  fdData = hdfsOpenFile(fs, lobDataFile_.data(), O_RDONLY, 0, 0,0);
+
+  HDFS_Client_RetCode hdfsClientRetcode = HDFS_CLIENT_OK;
+  HdfsClient *srcHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), 
NULL, hdfsClientRetcode);
+  ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: 
HdfsClient::newInstance returned an error");
+  HdfsClient *dstHdfsClient = HdfsClient::newInstance(getLobGlobalHeap(), 
NULL, hdfsClientRetcode);
+  ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: 
HdfsClient::newInstance returned an error");
   
-  if (!fdData)
+  hdfsClientRetcode  = srcHdfsClient->hdfsOpen(lobDataFile_.data(), FALSE); 
+  if (hdfsClientRetcode != HDFS_CLIENT_OK)
     {
       // extract substring small enough to fit in logBuf
       len = MINOF(lobDataFile_.length(),sizeof(logBuf) - 40); 
@@ -1844,14 +1952,11 @@ Ex_Lob_Error 
ExLob::compactLobDataFile(ExLobInMemoryDescChunksEntry *dcArray,Int
 
       str_sprintf(logBuf,"Could not open file:%s",lobDataFileSubstr2);
       lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
-      hdfsCloseFile(fs,fdData);
-      fdData = NULL;
       return LOB_DATA_FILE_OPEN_ERROR;
     }
-                          
         
-  hdfsFile fdTemp = hdfsOpenFile(fs, tmpLobDataFile,O_WRONLY|O_CREAT,0,0,0);
-  if (!fdTemp) 
+  hdfsClientRetcode  = dstHdfsClient->hdfsCreate(tmpLobDataFile, TRUE, FALSE); 
+  if (hdfsClientRetcode != HDFS_CLIENT_OK)
     {
       // extract substring small enough to fit in logBuf
       len = MINOF(sizeof(tmpLobDataFile),sizeof(logBuf)/3 - 20); 
@@ -1861,8 +1966,8 @@ Ex_Lob_Error 
ExLob::compactLobDataFile(ExLobInMemoryDescChunksEntry *dcArray,Int
 
       str_sprintf(logBuf,"Could not open file:%s",tmpLobDataFileSubstr2);
       lobDebugInfo(logBuf,0,__LINE__,lobTrace_);
-      hdfsCloseFile(fs,fdTemp);
-      fdTemp = NULL;
+      srcHdfsClient->hdfsClose();
+      HdfsClient::deleteInstance(srcHdfsClient);
       return LOB_DATA_FILE_OPEN_ERROR;
     }
 
@@ -1871,112 +1976,86 @@ Ex_Lob_Error 
ExLob::compactLobDataFile(ExLobInMemoryDescChunksEntry *dcArray,Int
    Int64 bytesWritten = 0;
    Int64 size = 0;
    Int64 chunkLen = 0;
+   Int64 readLen = 0;
+   Int64 offset;
    char * tgt = NULL;
-   while (i < numEntries)
-     {
-       chunkLen = dcArray[i].getChunkLen();
-       if (chunkLen > maxMemChunk)
-         {
-           tgt = (char *)(getLobGlobalHeap())->allocateMemory(maxMemChunk);
-           while (chunkLen > maxMemChunk)
-             {             
-               bytesRead = 
hdfsPread(fs,fdData,dcArray[i].getCurrentOffset(),tgt,maxMemChunk);
-               if (bytesRead != maxMemChunk)
-                 {
-                   lobDebugInfo("Problem reading from  data 
file",0,__LINE__,lobTrace_);
-                   getLobGlobalHeap()->deallocateMemory(tgt);
-                   return LOB_DATA_READ_ERROR;
-                 }
-               bytesWritten = hdfsWrite(fs,fdTemp, tgt,maxMemChunk);
-               if (bytesWritten != size)
-                 {
-                   lobDebugInfo("Problem writing temp data 
file",0,__LINE__,lobTrace_);
-                   getLobGlobalHeap()->deallocateMemory(tgt);
-                   return LOB_TARGET_FILE_WRITE_ERROR;
-                 }
-               chunkLen -= maxMemChunk;
-             }
-          
-         }
-       else
-         {
-           tgt = (char *)(getLobGlobalHeap())->allocateMemory(chunkLen);
-            bytesRead = 
hdfsPread(fs,fdData,dcArray[i].getCurrentOffset(),tgt,chunkLen);
-               if (bytesRead != chunkLen)
-                 {
-                   lobDebugInfo("Problem reading from  data 
file",0,__LINE__,lobTrace_);
-                   getLobGlobalHeap()->deallocateMemory(tgt);
-                   return LOB_DATA_READ_ERROR;
-                 }
-               bytesWritten = hdfsWrite(fs,fdTemp, tgt,chunkLen);
-               if (bytesWritten != chunkLen)
-                 {
-                   lobDebugInfo("Problem writing to temp data 
file",0,__LINE__,lobTrace_);
-                   getLobGlobalHeap()->deallocateMemory(tgt);
-                   return LOB_TARGET_FILE_WRITE_ERROR;
-                 }
-         }
-       if (hdfsFlush(fs, fdTemp)) {
-          lobDebugInfo("Problem flushing to temp data 
file",0,__LINE__,lobTrace_);
-         return LOB_DATA_FLUSH_ERROR;
+   Ex_Lob_Error  saveError = LOB_OPER_OK;
+   tgt = (char *)(getLobGlobalHeap())->allocateMemory(maxMemChunk);
+   while ((i < numEntries) && (saveError == LOB_OPER_OK)) 
+   {
+       readLen = dcArray[i].getChunkLen();
+       offset = dcArray[i].getCurrentOffset(); 
+       while (readLen > 0) 
+       {             
+          if (readLen > maxMemChunk)
+             chunkLen = maxMemChunk;
+          else
+             chunkLen = readLen;  
+          bytesRead = srcHdfsClient->hdfsRead(offset, tgt, chunkLen, 
hdfsClientRetcode);
+          if (hdfsClientRetcode != HDFS_CLIENT_OK) {
+             lobDebugInfo("Problem reading from  data 
file",0,__LINE__,lobTrace_);
+             saveError = LOB_SOURCE_FILE_READ_ERROR;
+             break;
+          }
+          bytesWritten = dstHdfsClient->hdfsWrite(tgt, chunkLen, 
hdfsClientRetcode);
+          if (hdfsClientRetcode != HDFS_CLIENT_OK || bytesWritten != chunkLen) 
{
+             lobDebugInfo("Problem writing temp data 
file",0,__LINE__,lobTrace_);
+             saveError = LOB_DATA_FILE_WRITE_ERROR;
+             break;
+          }
+          readLen -= chunkLen;
+          offset += chunkLen;
        }
-       getLobGlobalHeap()->deallocateMemory(tgt);
        i++;
-     }
-   hdfsCloseFile(fs,fdTemp);
-   hdfsCloseFile(fs,fdData);
-  
+   }
+   getLobGlobalHeap()->deallocateMemory(tgt);
+   srcHdfsClient->hdfsClose(); 
+   dstHdfsClient->hdfsClose();  
+   HdfsClient::deleteInstance(srcHdfsClient);
+   HdfsClient::deleteInstance(dstHdfsClient);
+   if (saveError != LOB_OPER_OK)
+      return saveError;
    //Now save the data file and rename the tempfile to the original datafile
 
-   Int32 rc2 = hdfsRename(fs,lobDataFile_.data(),saveLobDataFile);
-   if (rc2 == -1)
-     {
+   hdfsClientRetcode = 
HdfsClient::hdfsRename(lobDataFile_.data(),saveLobDataFile);
+   if (hdfsClientRetcode != HDFS_CLIENT_OK) {
        lobDebugInfo("Problem renaming datafile to save data 
file",0,__LINE__,lobTrace_);
        return LOB_DATA_FILE_WRITE_ERROR;
-     }
-   rc2 = hdfsRename(fs,tmpLobDataFile, lobDataFile_.data());
-   if (rc2 == -1)
-     {
+   }
+   hdfsClientRetcode = HdfsClient::hdfsRename(tmpLobDataFile, 
lobDataFile_.data());
+   if (hdfsClientRetcode != HDFS_CLIENT_OK) {
        lobDebugInfo("Problem renaming temp datafile to data 
file",0,__LINE__,lobTrace_);
        return LOB_DATA_FILE_WRITE_ERROR;
-     }
+   }
    return LOB_OPER_OK;
 }
 
 Ex_Lob_Error ExLob::restoreLobDataFile()
 {
-  Ex_Lob_Error rc = LOB_OPER_OK;
   lobDebugInfo("In ExLob::restoreLobDataFile",0,__LINE__,lobTrace_);
-  
-  hdfsFS fs = hdfsConnect(hdfsServer_,hdfsPort_);
-  if (fs == NULL)
-    return LOB_DATA_FILE_OPEN_ERROR;
+  HDFS_Client_RetCode hdfsClientRetcode; 
   char saveLobDataFile[lobDataFile_.length() + sizeof("_save")]; // sizeof 
includes room for null terminator
   strcpy(saveLobDataFile,lobDataFile_.data());
   strcpy(saveLobDataFile+lobDataFile_.length(),"_save");
-  Int32 rc2 = hdfsDelete(fs,lobDataFile_.data(),FALSE);//ok to ignore error.
-  rc2 = hdfsRename(fs,saveLobDataFile, lobDataFile_.data());
-  if (rc2)
+  hdfsClientRetcode = HdfsClient::hdfsDeletePath(lobDataFile_.data());
+  hdfsClientRetcode = HdfsClient::hdfsRename(saveLobDataFile, 
lobDataFile_.data());
+  if (hdfsClientRetcode != HDFS_CLIENT_OK)
      {
        lobDebugInfo("Problem renaming savedatafile to data 
file",0,__LINE__,lobTrace_);
        return LOB_OPER_ERROR; 
      }
-  return rc;
+  return LOB_OPER_OK;
 
 } 
 
 Ex_Lob_Error ExLob::purgeBackupLobDataFile()
 {
   Ex_Lob_Error rc = LOB_OPER_OK;
-   lobDebugInfo("In ExLob::purgeBackupLobDataFile",0,__LINE__,lobTrace_);
-  hdfsFS fs = hdfsConnect(hdfsServer_,hdfsPort_);
-  if (fs == NULL)
-    return LOB_DATA_FILE_OPEN_ERROR;
+  lobDebugInfo("In ExLob::purgeBackupLobDataFile",0,__LINE__,lobTrace_);
   char saveLobDataFile[lobDataFile_.length() + sizeof("_save")]; // sizeof 
includes room for null terminator
   strcpy(saveLobDataFile,lobDataFile_.data());
   strcpy(saveLobDataFile+lobDataFile_.length(),"_save");
-  Int32 rc2 = hdfsDelete(fs,saveLobDataFile,FALSE);//ok to ignore error.
-   
+  HDFS_Client_RetCode hdfsClientRetcode  = 
HdfsClient::hdfsDeletePath(saveLobDataFile);//ok to ignore error.
   return rc;
 }
 ///////////////////////////////////////////////////////////////////////////////
@@ -2055,6 +2134,29 @@ Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 
tgtSize, cursor_t &cursor, I
       bytesAvailable = cursor.descSize_ - cursor.bytesRead_;
       bytesToCopy = min(bytesAvailable, tgtSize - operLen);
       offset = cursor.descOffset_ + cursor.bytesRead_;
+      if (!useLibHdfs_) {
+         HDFS_Client_RetCode hdfsClientRetcode;
+         if (storage_ == Lob_External_HDFS_File) {
+            HdfsClient *srcHdfsClient = 
HdfsClient::newInstance(getLobGlobalHeap(), NULL, hdfsClientRetcode);
+            ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: 
HdfsClient::newInstance returned an error");
+            hdfsClientRetcode  = srcHdfsClient->hdfsOpen(lobDataFile_.data(), 
FALSE);
+            if (hdfsClientRetcode != HDFS_CLIENT_OK)
+               return LOB_SOURCE_FILE_OPEN_ERROR;
+            bytesRead = srcHdfsClient->hdfsRead(offset, tgt, bytesToCopy, 
hdfsClientRetcode);
+            if (hdfsClientRetcode != HDFS_CLIENT_OK) {
+               HdfsClient::deleteInstance(srcHdfsClient);
+               return LOB_SOURCE_FILE_READ_ERROR;
+            } 
+            HdfsClient::deleteInstance(srcHdfsClient);
+          }
+          else {
+               bytesRead = hdfsClient_->hdfsRead(offset, tgt, bytesToCopy, 
hdfsClientRetcode);
+               if (hdfsClientRetcode != HDFS_CLIENT_OK)
+                  return LOB_DATA_READ_ERROR;
+          }
+      }
+      else {
+
       // #endif
 
       if (!fdData_ || (openFlags_ != O_RDONLY)) 
@@ -2089,7 +2191,7 @@ Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 
tgtSize, cursor_t &cursor, I
       }
       Int64 totalnsecs = (secs * NUM_NSECS_IN_SEC) + nsecs;
       stats_.CumulativeReadTime += totalnsecs;
-
+      } // useLibHdfs
       if (bytesRead == -1) {
          return LOB_DATA_READ_ERROR;
       } else if (bytesRead == 0) {
@@ -2102,9 +2204,10 @@ Ex_Lob_Error ExLob::readCursorData(char *tgt, Int64 
tgtSize, cursor_t &cursor, I
       operLen += bytesRead;
       tgt += bytesRead;
    }
-  
-   hdfsCloseFile(fs_, fdData_);
-   fdData_ = NULL;
+   if (useLibHdfs_) {
+      hdfsCloseFile(fs_, fdData_);
+      fdData_ = NULL;
+   }
    return LOB_OPER_OK;
 }
 
@@ -2133,6 +2236,8 @@ Ex_Lob_Error ExLob::readDataToMem(char *memAddr,
     return err;
   
 
+  if (useLibHdfs_) 
+  {
   if (fdData_)// we may have a stale handle. close and open to refresh 
     {
       hdfsCloseFile(fs_, fdData_);
@@ -2161,10 +2266,19 @@ Ex_Lob_Error ExLob::readDataToMem(char *memAddr,
           
     }
        
-
+  } // useLibHdfs_
      
   if (!multipleChunks)
     {
+      if (! useLibHdfs_) {
+         HDFS_Client_RetCode hdfsClientRetcode;
+         Int32 readLen;
+         readLen = hdfsClient_->hdfsRead(offset, memAddr, size, 
hdfsClientRetcode);
+         if (hdfsClientRetcode != HDFS_CLIENT_OK)
+            return LOB_DATA_READ_ERROR;
+         operLen = readLen;
+         return LOB_OPER_OK;
+      }
       lobDebugInfo("Reading in single chunk",0,__LINE__,lobTrace_);
       if ((bytesRead = hdfsPread(fs_, fdData_, offset, 
                                 memAddr, size)) == -1) {
@@ -2972,28 +3086,8 @@ if (!lobGlobals->isHive() )
 
 void cleanupLOBDataDescFiles(const char *lobHdfsServer,int lobHdfsPort,const 
char *lobHdfsLoc)
 { 
-  int numExistingFiles=0;
-  hdfsFS fs;
-  int err = 0;
-  fs = hdfsConnect(lobHdfsServer, lobHdfsPort);
-  if (fs == NULL)
-    return;
-  // Get this list of all data and desc files in the lob sotrage location
-  hdfsFileInfo *fileInfos = hdfsListDirectory(fs, lobHdfsLoc, 
&numExistingFiles);
-  if (fileInfos == NULL)
-      return ;
-    
-  //Delete each one in a loop
-  for (int i = 0; i < numExistingFiles; i++)  
-    {    
-      err = hdfsDelete(fs, fileInfos[i].mName, 0);
-    }
-    
-  // *Note* : delete the memory allocated by libhdfs for the file info array  
-  if (fileInfos)
-    {
-      hdfsFreeFileInfo(fileInfos, numExistingFiles);
-    }
+  HDFS_Client_RetCode hdfsClientRetcode  = 
HdfsClient::hdfsDeletePath(lobHdfsLoc);//ok to ignore error.
+  return;
 }
 
 
@@ -3392,7 +3486,8 @@ ExLobGlobals::ExLobGlobals(NAHeap *lobHeap) :
     threadTraceFile_(NULL),
     lobTrace_(FALSE),
     numWorkerThreads_(0),
-    heap_(lobHeap)
+    heap_(lobHeap),
+    useLibHdfs_(FALSE)
 {
   //initialize the log file
   if (getenv("TRACE_HDFS_THREAD_ACTIONS"))
@@ -3403,6 +3498,12 @@ ExLobGlobals::ExLobGlobals(NAHeap *lobHeap) :
     }
   if(getenv("TRACE_LOB_ACTIONS"))
     lobTrace_ = TRUE;
+  char *useLibHdfsStr = getenv("USE_LIBHDFS");
+  int useLibHdfs = 0;
+  if (useLibHdfsStr != NULL) 
+     useLibHdfs = atoi(useLibHdfsStr);
+  if (useLibHdfs != 0) 
+      useLibHdfs_ = TRUE;
 }
 
 ExLobGlobals::~ExLobGlobals()
@@ -3481,10 +3582,17 @@ ExLobGlobals::~ExLobGlobals()
     //delete the lobMap AFTER the worker threads have finished their pending 
     //work since they may still be using an objetc that was fetched off the 
lobMap_
     if (lobMap_) 
-      {
+    {
+        lobMap_it it2; 
+        for (it2 = lobMap_->begin(); it2 != lobMap_->end() ; ++it2)
+        {
+           ExLob *lobPtr = it2->second; 
+           NADELETE(lobPtr, ExLob, heap_);
+        } 
+        lobMap_->clear();
         NADELETE(lobMap_,lobMap_t,heap_);
         lobMap_ = NULL;
-      }
+    }
     
     //msg_mon_close_process(&serverPhandle);
     if (threadTraceFile_)

http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/exp/ExpLOBaccess.h
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpLOBaccess.h b/core/sql/exp/ExpLOBaccess.h
index 3c5e93e..416c3cc 100644
--- a/core/sql/exp/ExpLOBaccess.h
+++ b/core/sql/exp/ExpLOBaccess.h
@@ -397,6 +397,10 @@ class ExLob : public NABasicObject
     ExLob(NAHeap * heap);  // default constructor
     virtual ~ExLob(); // default desctructor
 
+    void setUseLibHdfs(NABoolean useLibHdfs)
+       { useLibHdfs_ = useLibHdfs; } 
+
+    NABoolean useLibHdfs() { return useLibHdfs_; }
     Ex_Lob_Error initialize(const char *lobFile, Ex_Lob_Mode mode, char *dir, 
                             LobsStorage storage, char *hdfsServer, Int64 
hdfsPort,
                             char *lobLocation,
@@ -515,6 +519,8 @@ class ExLob : public NABasicObject
     bool prefetchQueued_;
     NAHeap *lobGlobalHeap_;
     NABoolean lobTrace_;
+    NABoolean useLibHdfs_;
+    HdfsClient *hdfsClient_;
 };
 
 typedef map<string, ExLob *> lobMap_t;
@@ -619,6 +625,11 @@ class ExLobGlobals
     {
       return heap_;
     }
+    
+    NABoolean useLibHdfs() { return useLibHdfs_; }
+    void setUseLibHdfs(NABoolean useLibHdfs)
+       { useLibHdfs_ = useLibHdfs; } 
+
   void traceMessage(const char *logMessage, ExLobCursor *c, int line);
   
   public :
@@ -639,6 +650,7 @@ class ExLobGlobals
     NAHeap *heap_;
     NABoolean lobTrace_;
     long numWorkerThreads_; 
+    NABoolean useLibHdfs_;
 };
 
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/exp/ExpLOBinterface.cpp
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpLOBinterface.cpp b/core/sql/exp/ExpLOBinterface.cpp
index 1a6d6ed..a90a0cb 100644
--- a/core/sql/exp/ExpLOBinterface.cpp
+++ b/core/sql/exp/ExpLOBinterface.cpp
@@ -65,16 +65,17 @@ Lng32 ExpLOBinterfaceInit(ExLobGlobals *& exLobGlob, NAHeap 
* parentHeap,
                   0);
   if (exLobGlob)
     {
-    
-      if (isHiveRead)
+      if (exLobGlob->useLibHdfs_) 
+      {
+        if (isHiveRead)
         {
           ((ExLobGlobals *)exLobGlob)->startWorkerThreads();
           lobHeap->setThreadSafe();
         }
-      
-      
+      }
     }
-
+  if (exLobGlob->useLibHdfs_)
+  {
   //set hdfsConnection from context global 
   ContextCli *localContext = (ContextCli *)currContext;
   if (localContext)
@@ -89,7 +90,8 @@ Lng32 ExpLOBinterfaceInit(ExLobGlobals *& exLobGlob, NAHeap * 
parentHeap,
           ((ExLobGlobals *)exLobGlob)->setHdfsFs(fs);
         }
     }
-
+  }
+ 
   if (err != LOB_OPER_OK)
     return err;
   else

http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/regress/executor/EXPECTED130
----------------------------------------------------------------------
diff --git a/core/sql/regress/executor/EXPECTED130 
b/core/sql/regress/executor/EXPECTED130
index fa69e88..efedb9c 100644
--- a/core/sql/regress/executor/EXPECTED130
+++ b/core/sql/regress/executor/EXPECTED130
@@ -652,12 +652,12 @@ And the dish ran away with the fork !
 >>
 >>insert into tlob130txt_limit50 values(1,filetolob('lob_input_e1.txt'));
 
-*** ERROR[8442] Unable to access ExpLOBInterfaceInsert interface. Call to 
ExpLOBInterfaceInsert returned error LOB_MAX_LIMIT_ERROR(560). Error detail 0.
+*** ERROR[8442] Unable to access ExpLOBInterfaceInsert interface. Call to 
ExpLOBInterfaceInsert returned error LOB_MAX_LIMIT_ERROR(560). Error detail: 0. 
Cause: .
 
 --- 0 row(s) inserted.
 >>insert into tlob130bin_limit1K values(1,filetolob('anoush.jpg'));
 
-*** ERROR[8442] Unable to access ExpLOBInterfaceInsert interface. Call to 
ExpLOBInterfaceInsert returned error LOB_MAX_LIMIT_ERROR(560). Error detail 0.
+*** ERROR[8442] Unable to access ExpLOBInterfaceInsert interface. Call to 
ExpLOBInterfaceInsert returned error LOB_MAX_LIMIT_ERROR(560). Error detail: 0. 
Cause: .
 
 --- 0 row(s) inserted.
 >>--test extract 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/52f074af/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java 
b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index 2f24dce..7d1b43b 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -292,6 +292,7 @@ public class HDFSClient
 
    boolean hdfsCreate(String fname , boolean overwrite, boolean compress) 
throws IOException
    {
+      filename_ = fname;
       if (logger_.isDebugEnabled()) 
         logger_.debug("HDFSClient.hdfsCreate() - started" );
       if (!compress || (compress && fname.endsWith(".gz")))
@@ -302,27 +303,17 @@ public class HDFSClient
       fs_ = FileSystem.get(filepath_.toUri(),config_);
       compressed_ = compress;
       fsdis_ = null;      
-      FSDataOutputStream fsOut;
+      FSDataOutputStream fsOut = null;
       if (overwrite)
          fsOut = fs_.create(filepath_);
-      else
-      if (fs_.exists(filepath_))
-         fsOut = fs_.append(filepath_);
-      else
-         fsOut = fs_.create(filepath_);
-
-      if (compressed_) {
-          GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( 
GzipCodec.class, config_);
-          Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
-          outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor);
-      }
-      else
-         outStream_ = fsOut;
+      if (fsOut != null)
+         fsOut.close();
       return true;
-   }
+   } 
 
    boolean hdfsOpen(String fname , boolean compress) throws IOException
    {
+      filename_ = fname;
       if (logger_.isDebugEnabled()) 
          logger_.debug("HDFSClient.hdfsOpen() - started" );
       if (!compress || (compress && fname.endsWith(".gz")))
@@ -335,6 +326,43 @@ public class HDFSClient
       fsdis_ = null;      
       return true;
     }
+
+    long hdfsSize() throws IOException
+    {
+       FileStatus filestatus;
+       try 
+       {
+          filestatus = fs_.getFileStatus(filepath_);
+       } catch (java.io.FileNotFoundException e)
+       {
+          return 0;
+       }
+       if (filestatus.isFile())
+          return filestatus.getLen();
+       else
+          return -1;
+    }
+
+    long hdfsWriteImmediate(byte[] buff) throws IOException
+    {
+      if (logger_.isDebugEnabled()) 
+         logger_.debug("HDFSClient.hdfsWriteClose() - started" );
+      FSDataOutputStream fsOut;
+      FileStatus filestatus;
+      long writeOffset; 
+      if (fs_.exists(filepath_)) {
+         filestatus = fs_.getFileStatus(filepath_);
+         fsOut = fs_.append(filepath_);
+         writeOffset = filestatus.getLen(); 
+      }
+      else {
+         fsOut = fs_.create(filepath_);
+         writeOffset = 0;
+      }
+      fsOut.write(buff);
+      fsOut.close();
+      return writeOffset;
+    }
     
     int hdfsWrite(byte[] buff) throws IOException
     {
@@ -359,16 +387,19 @@ public class HDFSClient
             logger_.debug("HDFSClient.hdfsWrite() - output stream created" );
       }
       outStream_.write(buff);
+      if (outStream_ instanceof FSDataOutputStream)
+         ((FSDataOutputStream)outStream_).hsync();
       if (logger_.isDebugEnabled()) 
          logger_.debug("HDFSClient.hdfsWrite() - bytes written " + 
buff.length);
       return buff.length;
     }
 
-    int hdfsRead(ByteBuffer buffer) throws IOException
+    int hdfsRead(long pos, ByteBuffer buffer) throws IOException
     {
       if (logger_.isDebugEnabled()) 
          logger_.debug("HDFSClient.hdfsRead() - started" );
       if (fsdis_ == null && inStream_ == null ) {
+         try {
          codec_ = codecFactory_.getCodec(filepath_);
          if (codec_ != null) {
             compressed_ = true;
@@ -376,19 +407,34 @@ public class HDFSClient
          }
          else
             fsdis_ = fs_.open(filepath_);
-         pos_ = 0;
+         } catch (java.io.FileNotFoundException e) {
+            return 0;
+         }
       }
       int lenRemain;   
       int bytesRead;
       int totalBytesRead = 0;
       int bufLen;
       int bufOffset = 0;
+      if (compressed_) {
+         if (pos != 0 && pos != -1)
+            throw new IOException("Compressed files can be read from a 
non-zero position");
+         else
+            pos_ = 0;
+      }
+      else
+      if (pos != -1) 
+         pos_ = pos;
       if (compressed_ && bufArray_ != null) 
          bufArray_ = new byte[ioByteArraySizeInKB_ * 1024];
       if (buffer.hasArray())
          bufLen = buffer.array().length;
       else
+      {
+         if (pos_ != -1)
+            fsdis_.seek(pos_);
          bufLen = buffer.capacity();
+      }
       lenRemain = bufLen;
       do
       {
@@ -413,7 +459,6 @@ public class HDFSClient
     {
       if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsClose() - 
started" );
       if (outStream_ != null) {
-          outStream_.flush();
           outStream_.close();
           outStream_ = null;
       }
@@ -422,7 +467,24 @@ public class HDFSClient
       return true;
     }
 
-    
+    static long hdfsSize(String filename) throws IOException
+    {
+       Path filepath = new Path(filename);
+       FileSystem fs = FileSystem.get(filepath.toUri(),config_);
+       FileStatus filestatus;
+       try
+       {
+          filestatus = fs.getFileStatus(filepath);
+       } catch (java.io.FileNotFoundException e)
+       {
+          return 0;
+       } 
+       if (filestatus.isFile())
+          return filestatus.getLen();
+       else
+          return -1;
+    } 
+
     public static boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) 
throws IOException
     {
       if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() 
- start");
@@ -547,7 +609,7 @@ public class HDFSClient
          return 0;
    }
 
-
+   
    public void stop() throws IOException
    {
       if (future_ != null) {
@@ -669,6 +731,17 @@ public class HDFSClient
       return true;
    }
 
+   public static boolean hdfsRename(String fromPathStr, String toPathStr) 
throws IOException 
+   {
+      if (logger_.isDebugEnabled()) 
+         logger_.debug("HDFSClient.hdfsRename(" + fromPathStr + ", " + 
toPathStr + ")");
+      Path fromPath = new Path(fromPathStr );
+      Path toPath = new Path(toPathStr );
+      FileSystem fs = FileSystem.get(fromPath.toUri(), config_);
+      fs.rename(fromPath, toPath);
+      return true;
+   }
+
    private native int sendFileStatus(long jniObj, int numFiles, int fileNo, 
boolean isDir, 
                         String filename, long modTime, long len,
                         short numReplicas, long blockSize, String owner, 
String group,

Reply via email to