Repository: trafodion Updated Branches: refs/heads/master 05d92ae7b -> c29ebc6e5
[TRAFODION-3009] Streamline error handling in Executor utility commands Changes to avoid allocation of ComDiagsArea in some more places unless it is needed to pass errors or warnings. [TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text formatted hive tables Disabling USE_LIBHDFS_SCAN by default. The new implementation of Hdfs Scan is now used to scan the text type hive files. Hdfs Scan is now stopped gracefully when the hive scan is cancelled. This avoids the random core seen with new implementation. Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/060bfc6c Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/060bfc6c Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/060bfc6c Branch: refs/heads/master Commit: 060bfc6c41277907af523d7bb62e02eb91b5bc86 Parents: 6e39af2 Author: selvaganesang <selva.govindara...@esgyn.com> Authored: Tue Apr 17 05:09:04 2018 +0000 Committer: selvaganesang <selva.govindara...@esgyn.com> Committed: Tue Apr 17 05:09:04 2018 +0000 ---------------------------------------------------------------------- core/sql/executor/ExExeUtilCli.cpp | 4 +- core/sql/executor/ExExeUtilLoad.cpp | 202 ++----------------- core/sql/executor/ExHdfsScan.cpp | 2 + core/sql/sqlcomp/nadefaults.cpp | 2 +- .../main/java/org/trafodion/sql/HDFSClient.java | 2 +- 5 files changed, 27 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/060bfc6c/core/sql/executor/ExExeUtilCli.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExExeUtilCli.cpp b/core/sql/executor/ExExeUtilCli.cpp index aa1553b..eb7ae04 100644 --- a/core/sql/executor/ExExeUtilCli.cpp +++ b/core/sql/executor/ExExeUtilCli.cpp @@ -1094,7 +1094,7 @@ Lng32 ExeCliInterface::executeImmediate(const char * stmtStr, Lng32 retcode = 0; ComDiagsArea * tempDiags = NULL; - if (globalDiags != NULL && *globalDiags != NULL) + if (globalDiags != NULL && *globalDiags != NULL && (*globalDiags)->getNumber() > 0) { tempDiags = ComDiagsArea::allocate(heap_); tempDiags->mergeAfter(**globalDiags); @@ -1119,7 +1119,7 @@ ExecuteImmediateReturn: { // Allocate the diagnostics area if needed // and populate the diagnostics conditions - if (*globalDiags == NULL && retcode != 0) { + if (*globalDiags == NULL && retcode != 0 && retcode != 100) { *globalDiags = ComDiagsArea::allocate(getHeap()); SQL_EXEC_MergeDiagnostics_Internal(**globalDiags); } http://git-wip-us.apache.org/repos/asf/trafodion/blob/060bfc6c/core/sql/executor/ExExeUtilLoad.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExExeUtilLoad.cpp b/core/sql/executor/ExExeUtilLoad.cpp index 5f33b72..5024f8c 100644 --- a/core/sql/executor/ExExeUtilLoad.cpp +++ b/core/sql/executor/ExExeUtilLoad.cpp @@ -124,9 +124,7 @@ short ExExeUtilCreateTableAsTcb::work() ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals(); ExMasterStmtGlobals *masterGlob = exeGlob->castToExMasterStmtGlobals(); ContextCli *currContext = masterGlob->getStatement()->getContext(); - - ExTransaction *ta = getGlobals()->castToExExeStmtGlobals()-> - castToExMasterStmtGlobals()->getStatement()->getContext()->getTransaction(); + ExTransaction *ta = currContext->getTransaction(); while (1) { @@ -535,69 +533,24 @@ short ExExeUtilCreateTableAsTcb::work() case DONE_: { - if (qparent_.up->isFull()) - return WORK_OK; - - // Return EOF. - ex_queue_entry * up_entry = qparent_.up->getTailEntry(); - - up_entry->upState.parentIndex = - pentry_down->downState.parentIndex; - - up_entry->upState.setMatchNo(0); - up_entry->upState.status = ex_queue::Q_NO_DATA; - - // insert into parent - qparent_.up->insert(); - + retcode = handleDone(); + if (retcode == 1) + return WORK_OK; step_ = INITIAL_; - qparent_.down->removeHead(); - return WORK_OK; } break; case ERROR_: { - if (qparent_.up->isFull()) - return WORK_OK; - - // Return EOF. - ex_queue_entry * up_entry = qparent_.up->getTailEntry(); - - up_entry->upState.parentIndex = - pentry_down->downState.parentIndex; - - up_entry->upState.setMatchNo(0); - up_entry->upState.status = ex_queue::Q_SQLERROR; - - ComDiagsArea *diagsArea = up_entry->getDiagsArea(); - - if (diagsArea == NULL) - diagsArea = - ComDiagsArea::allocate(this->getGlobals()->getDefaultHeap()); - else - diagsArea->incrRefCount (); // setDiagsArea call below will decr ref count - - if (getDiagsArea()) - diagsArea->mergeAfter(*getDiagsArea()); - - up_entry->setDiagsArea (diagsArea); - - // insert into parent - qparent_.up->insert(); - - pstate.matches_ = 0; - + retcode = handleError(); + if (retcode == 1) + return WORK_OK; step_ = DONE_; } break; - - } // switch } // while - - return WORK_OK; @@ -1679,74 +1632,20 @@ short ExExeUtilHBaseBulkLoadTcb::work() case DONE_: { - if (qparent_.up->isFull()) - return WORK_OK; - - // Return EOF. - ex_queue_entry * up_entry = qparent_.up->getTailEntry(); - - up_entry->upState.parentIndex = pentry_down->downState.parentIndex; - - up_entry->upState.setMatchNo(0); - up_entry->upState.status = ex_queue::Q_NO_DATA; - - ComDiagsArea *diagsArea = up_entry->getDiagsArea(); - - if (diagsArea == NULL) - diagsArea = ComDiagsArea::allocate(getMyHeap()); - else - diagsArea->incrRefCount(); // setDiagsArea call below will decr ref count - - if (getDiagsArea()) - diagsArea->mergeAfter(*getDiagsArea()); - + retcode = handleDone(); + if (retcode == 1) + return WORK_OK; masterGlob->setRowsAffected(rowsAffected_); - - up_entry->setDiagsArea(diagsArea); - - // insert into parent - qparent_.up->insert(); step_ = INITIAL_; - qparent_.down->removeHead(); return WORK_OK; } break; case LOAD_ERROR_: { - if (qparent_.up->isFull()) - return WORK_OK; - - // Return EOF. - ex_queue_entry * up_entry = qparent_.up->getTailEntry(); - - up_entry->upState.parentIndex = pentry_down->downState.parentIndex; - - up_entry->upState.setMatchNo(0); - up_entry->upState.status = ex_queue::Q_SQLERROR; - - ComDiagsArea *diagsArea = up_entry->getDiagsArea(); - - if (diagsArea == NULL) - diagsArea = ComDiagsArea::allocate(getMyHeap()); - else - diagsArea->incrRefCount(); // setDiagsArea call below will decr ref count - - if (getDiagsArea()) - { - diagsArea->mergeAfter(*getDiagsArea()); - diagsArea->setRowCount(rowsAffected_); - } - - up_entry->setDiagsArea(diagsArea); - - // insert into parent - qparent_.up->insert(); - - pstate.matches_ = 0; - - - + retcode = handleError(); + if (retcode == 1) + return WORK_OK; step_ = DONE_; } break; @@ -2153,9 +2052,8 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() ex_queue_entry * pentry_down = qparent_.down->getHeadEntry(); ExExeUtilPrivateState & pstate = *((ExExeUtilPrivateState*) pentry_down->pstate); - - ExTransaction *ta = getGlobals()->castToExExeStmtGlobals()-> - castToExMasterStmtGlobals()->getStatement()->getContext()->getTransaction(); + ExMasterStmtGlobals *masterGlob = getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals(); + ExTransaction *ta = masterGlob->getStatement()->getContext()->getTransaction(); while (1) { @@ -2177,7 +2075,6 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() { ExHbaseAccessTcb::setupError((NAHeap *)getMyHeap(),qparent_, retcode, "ExpHbaseInterface_JNI::init"); - handleError(); step_ = UNLOAD_END_ERROR_; break; } @@ -2327,7 +2224,6 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() ExHbaseAccessTcb::setupError((NAHeap *)getMyHeap(),qparent_, hbcRetCode, "HBaseClient_JNI::createSnapshot/verifySnapshot", snapshotsList_->at(i)->snapshotName->data() ); - handleError(); step_ = UNLOAD_END_ERROR_; break; } @@ -2389,7 +2285,6 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() ExHbaseAccessTcb::setupError((NAHeap *)getMyHeap(),qparent_, hbcRetCode, "HBaseClient_JNI::createSnapshot/verifySnapshot", snapshotsList_->at(i)->snapshotName->data() ); - handleError(); step_ = UNLOAD_END_ERROR_; break; } @@ -2477,36 +2372,11 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() case DONE_: { - if (qparent_.up->isFull()) + retcode = handleDone(); + if (retcode == 1) return WORK_OK; - - // Return EOF. - ex_queue_entry * up_entry = qparent_.up->getTailEntry(); - - up_entry->upState.parentIndex = pentry_down->downState.parentIndex; - - up_entry->upState.setMatchNo(0); - up_entry->upState.status = ex_queue::Q_NO_DATA; - - ComDiagsArea *diagsArea = up_entry->getDiagsArea(); - - if (diagsArea == NULL) - diagsArea = ComDiagsArea::allocate(getMyHeap()); - else - diagsArea->incrRefCount(); // setDiagsArea call below will decr ref count - - diagsArea->setRowCount(rowsAffected_); - - if (getDiagsArea()) - diagsArea->mergeAfter(*getDiagsArea()); - - up_entry->setDiagsArea(diagsArea); - - // insert into parent - qparent_.up->insert(); + masterGlob->setRowsAffected(rowsAffected_); step_ = INITIAL_; - qparent_.down->removeHead(); - freeResources(); return WORK_OK; } @@ -2514,36 +2384,9 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() case UNLOAD_ERROR_: { - if (qparent_.up->isFull()) - return WORK_OK; - - // Return EOF. - ex_queue_entry * up_entry = qparent_.up->getTailEntry(); - - up_entry->upState.parentIndex = pentry_down->downState.parentIndex; - - up_entry->upState.setMatchNo(0); - up_entry->upState.status = ex_queue::Q_SQLERROR; - - ComDiagsArea *diagsArea = up_entry->getDiagsArea(); - - if (diagsArea == NULL) - diagsArea = ComDiagsArea::allocate(getMyHeap()); - else - diagsArea->incrRefCount(); // setDiagsArea call below will decr ref count - - if (getDiagsArea()) - diagsArea->mergeAfter(*getDiagsArea()); - - up_entry->setDiagsArea(diagsArea); - - // insert into parent - qparent_.up->insert(); - - pstate.matches_ = 0; - - - + retcode = handleError(); + if (retcode == 1) + return WORK_OK; step_ = DONE_; } break; @@ -2552,7 +2395,6 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() } // while return WORK_OK; - } short ExExeUtilHBaseBulkUnLoadTcb::moveRowToUpQueue(const char * row, Lng32 len, @@ -3447,7 +3289,6 @@ short ExExeUtilLobExtractTcb::work() retcode = handleError(); if (retcode == 1) return WORK_OK; - step_ = DONE_; } break; @@ -3457,7 +3298,6 @@ short ExExeUtilLobExtractTcb::work() retcode = handleDone(); if (retcode == 1) return WORK_OK; - step_ = EMPTY_; return WORK_OK; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/060bfc6c/core/sql/executor/ExHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp index f77d714..d09f6cd 100644 --- a/core/sql/executor/ExHdfsScan.cpp +++ b/core/sql/executor/ExHdfsScan.cpp @@ -255,6 +255,8 @@ ExHdfsScanTcb::~ExHdfsScanTcb() void ExHdfsScanTcb::freeResources() { + if (hdfsScan_ != NULL) + hdfsScan_->stop(); if (loggingFileName_ != NULL) { NADELETEBASIC(loggingFileName_, getHeap()); loggingFileName_ = NULL; http://git-wip-us.apache.org/repos/asf/trafodion/blob/060bfc6c/core/sql/sqlcomp/nadefaults.cpp ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp index 244a2bc..0d7b41b 100644 --- a/core/sql/sqlcomp/nadefaults.cpp +++ b/core/sql/sqlcomp/nadefaults.cpp @@ -3033,7 +3033,7 @@ XDDkwd__(SUBQUERY_UNNESTING, "ON"), // Use large queues on RHS of Flow/Nested Join when appropriate DDkwd__(USE_LARGE_QUEUES, "ON"), - DDkwd__(USE_LIBHDFS_SCAN, "ON"), + DDkwd__(USE_LIBHDFS_SCAN, "OFF"), DDkwd__(USE_MAINTAIN_CONTROL_TABLE, "OFF"), http://git-wip-us.apache.org/repos/asf/trafodion/blob/060bfc6c/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index ff78d3d..0346bef 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -470,7 +470,7 @@ public class HDFSClient { if (future_ != null) { try { - future_.get(200, TimeUnit.MILLISECONDS); + future_.get(30, TimeUnit.SECONDS); } catch(TimeoutException e) { logger_.error("Asynchronous Thread of HdfsScan is Cancelled (timeout), ", e); future_.cancel(true);