Repository: trafodion
Updated Branches:
  refs/heads/master 05d92ae7b -> c29ebc6e5


[TRAFODION-3009] Streamline error handling in Executor utility commands

Changes to avoid allocation of ComDiagsArea in some more places unless it is
needed to pass errors or warnings.

[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text 
formatted hive tables

Disabling USE_LIBHDFS_SCAN by default. The new implementation of Hdfs Scan is 
now used to
scan the text type hive files.

Hdfs Scan is now stopped gracefully when the hive scan is cancelled. This 
avoids the random
core seen with new implementation.


Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/060bfc6c
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/060bfc6c
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/060bfc6c

Branch: refs/heads/master
Commit: 060bfc6c41277907af523d7bb62e02eb91b5bc86
Parents: 6e39af2
Author: selvaganesang <selva.govindara...@esgyn.com>
Authored: Tue Apr 17 05:09:04 2018 +0000
Committer: selvaganesang <selva.govindara...@esgyn.com>
Committed: Tue Apr 17 05:09:04 2018 +0000

----------------------------------------------------------------------
 core/sql/executor/ExExeUtilCli.cpp              |   4 +-
 core/sql/executor/ExExeUtilLoad.cpp             | 202 ++-----------------
 core/sql/executor/ExHdfsScan.cpp                |   2 +
 core/sql/sqlcomp/nadefaults.cpp                 |   2 +-
 .../main/java/org/trafodion/sql/HDFSClient.java |   2 +-
 5 files changed, 27 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/060bfc6c/core/sql/executor/ExExeUtilCli.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExExeUtilCli.cpp 
b/core/sql/executor/ExExeUtilCli.cpp
index aa1553b..eb7ae04 100644
--- a/core/sql/executor/ExExeUtilCli.cpp
+++ b/core/sql/executor/ExExeUtilCli.cpp
@@ -1094,7 +1094,7 @@ Lng32 ExeCliInterface::executeImmediate(const char * 
stmtStr,
   Lng32 retcode = 0;
 
   ComDiagsArea * tempDiags = NULL;
-  if (globalDiags != NULL && *globalDiags != NULL)
+  if (globalDiags != NULL && *globalDiags != NULL && 
(*globalDiags)->getNumber() > 0)
   {
      tempDiags = ComDiagsArea::allocate(heap_);
      tempDiags->mergeAfter(**globalDiags);
@@ -1119,7 +1119,7 @@ ExecuteImmediateReturn:
     {
       // Allocate the diagnostics area if needed
       // and populate the diagnostics conditions
-      if (*globalDiags == NULL && retcode != 0) {
+      if (*globalDiags == NULL && retcode != 0 && retcode != 100) {
          *globalDiags = ComDiagsArea::allocate(getHeap());
          SQL_EXEC_MergeDiagnostics_Internal(**globalDiags);
       }

http://git-wip-us.apache.org/repos/asf/trafodion/blob/060bfc6c/core/sql/executor/ExExeUtilLoad.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExExeUtilLoad.cpp 
b/core/sql/executor/ExExeUtilLoad.cpp
index 5f33b72..5024f8c 100644
--- a/core/sql/executor/ExExeUtilLoad.cpp
+++ b/core/sql/executor/ExExeUtilLoad.cpp
@@ -124,9 +124,7 @@ short ExExeUtilCreateTableAsTcb::work()
   ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals();
   ExMasterStmtGlobals *masterGlob = exeGlob->castToExMasterStmtGlobals();
   ContextCli *currContext = masterGlob->getStatement()->getContext();
-  
-  ExTransaction *ta = getGlobals()->castToExExeStmtGlobals()->
-    
castToExMasterStmtGlobals()->getStatement()->getContext()->getTransaction();
+  ExTransaction *ta = currContext->getTransaction();
   
   while (1)
     {
@@ -535,69 +533,24 @@ short ExExeUtilCreateTableAsTcb::work()
 
        case DONE_:
          {
-           if (qparent_.up->isFull())
-             return WORK_OK;
-
-           // Return EOF.
-           ex_queue_entry * up_entry = qparent_.up->getTailEntry();
-           
-           up_entry->upState.parentIndex = 
-             pentry_down->downState.parentIndex;
-           
-           up_entry->upState.setMatchNo(0);
-           up_entry->upState.status = ex_queue::Q_NO_DATA;
-           
-           // insert into parent
-           qparent_.up->insert();
-           
+           retcode = handleDone();
+           if (retcode == 1)
+              return WORK_OK;
            step_ = INITIAL_;
-           qparent_.down->removeHead();
-           
            return WORK_OK;
          }
        break;
 
        case ERROR_:
          {
-           if (qparent_.up->isFull())
-             return WORK_OK;
-
-           // Return EOF.
-           ex_queue_entry * up_entry = qparent_.up->getTailEntry();
-           
-           up_entry->upState.parentIndex = 
-             pentry_down->downState.parentIndex;
-           
-           up_entry->upState.setMatchNo(0);
-           up_entry->upState.status = ex_queue::Q_SQLERROR;
-
-           ComDiagsArea *diagsArea = up_entry->getDiagsArea();
-           
-           if (diagsArea == NULL)
-             diagsArea = 
-               ComDiagsArea::allocate(this->getGlobals()->getDefaultHeap());
-            else
-              diagsArea->incrRefCount (); // setDiagsArea call below will decr 
ref count
-           
-           if (getDiagsArea())
-             diagsArea->mergeAfter(*getDiagsArea());
-           
-           up_entry->setDiagsArea (diagsArea);
-           
-           // insert into parent
-           qparent_.up->insert();
-           
-           pstate.matches_ = 0;
-
+           retcode = handleError();
+           if (retcode == 1)
+              return WORK_OK;
            step_ = DONE_;
          }
        break;
-
-
        } // switch
     } // while
-
-  
   
   return WORK_OK;
 
@@ -1679,74 +1632,20 @@ short ExExeUtilHBaseBulkLoadTcb::work()
 
     case DONE_:
     {
-      if (qparent_.up->isFull())
-        return WORK_OK;
-
-      // Return EOF.
-      ex_queue_entry * up_entry = qparent_.up->getTailEntry();
-
-      up_entry->upState.parentIndex = pentry_down->downState.parentIndex;
-
-      up_entry->upState.setMatchNo(0);
-      up_entry->upState.status = ex_queue::Q_NO_DATA;
-
-      ComDiagsArea *diagsArea = up_entry->getDiagsArea();
-
-      if (diagsArea == NULL)
-        diagsArea = ComDiagsArea::allocate(getMyHeap());
-      else
-        diagsArea->incrRefCount(); // setDiagsArea call below will decr ref 
count
-
-      if (getDiagsArea())
-        diagsArea->mergeAfter(*getDiagsArea());
-
+      retcode = handleDone();
+      if (retcode == 1)
+         return WORK_OK;
       masterGlob->setRowsAffected(rowsAffected_);
-
-      up_entry->setDiagsArea(diagsArea);
-
-      // insert into parent
-      qparent_.up->insert();
       step_ = INITIAL_;
-      qparent_.down->removeHead();
       return WORK_OK;
     }
     break;
 
     case LOAD_ERROR_:
     {
-      if (qparent_.up->isFull())
-        return WORK_OK;
-
-      // Return EOF.
-      ex_queue_entry * up_entry = qparent_.up->getTailEntry();
-
-      up_entry->upState.parentIndex = pentry_down->downState.parentIndex;
-
-      up_entry->upState.setMatchNo(0);
-      up_entry->upState.status = ex_queue::Q_SQLERROR;
-
-      ComDiagsArea *diagsArea = up_entry->getDiagsArea();
-
-      if (diagsArea == NULL)
-        diagsArea = ComDiagsArea::allocate(getMyHeap());
-      else
-        diagsArea->incrRefCount(); // setDiagsArea call below will decr ref 
count
-
-      if (getDiagsArea())
-        {
-        diagsArea->mergeAfter(*getDiagsArea());
-          diagsArea->setRowCount(rowsAffected_);
-        }
-
-      up_entry->setDiagsArea(diagsArea);
-
-      // insert into parent
-      qparent_.up->insert();
-
-      pstate.matches_ = 0;
-
-
-
+      retcode = handleError();
+      if (retcode == 1)
+         return WORK_OK;
       step_ = DONE_;
     }
     break;
@@ -2153,9 +2052,8 @@ short ExExeUtilHBaseBulkUnLoadTcb::work()
 
   ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
   ExExeUtilPrivateState & pstate = *((ExExeUtilPrivateState*) 
pentry_down->pstate);
-
-  ExTransaction *ta = getGlobals()->castToExExeStmtGlobals()->
-      
castToExMasterStmtGlobals()->getStatement()->getContext()->getTransaction();
+  ExMasterStmtGlobals *masterGlob = 
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals();
+  ExTransaction *ta = 
masterGlob->getStatement()->getContext()->getTransaction();
 
   while (1)
   {
@@ -2177,7 +2075,6 @@ short ExExeUtilHBaseBulkUnLoadTcb::work()
       {
          ExHbaseAccessTcb::setupError((NAHeap *)getMyHeap(),qparent_, retcode, 
                 "ExpHbaseInterface_JNI::init"); 
-         handleError();
          step_ = UNLOAD_END_ERROR_;
          break;
       }
@@ -2327,7 +2224,6 @@ short ExExeUtilHBaseBulkUnLoadTcb::work()
           ExHbaseAccessTcb::setupError((NAHeap *)getMyHeap(),qparent_, 
hbcRetCode, 
                 "HBaseClient_JNI::createSnapshot/verifySnapshot", 
                 snapshotsList_->at(i)->snapshotName->data() );
-          handleError();
           step_ = UNLOAD_END_ERROR_;
           break;
         }
@@ -2389,7 +2285,6 @@ short ExExeUtilHBaseBulkUnLoadTcb::work()
           ExHbaseAccessTcb::setupError((NAHeap *)getMyHeap(),qparent_, 
hbcRetCode, 
                 "HBaseClient_JNI::createSnapshot/verifySnapshot", 
                 snapshotsList_->at(i)->snapshotName->data() );
-          handleError();
           step_ = UNLOAD_END_ERROR_;
           break;
         }
@@ -2477,36 +2372,11 @@ short ExExeUtilHBaseBulkUnLoadTcb::work()
 
     case DONE_:
     {
-      if (qparent_.up->isFull())
+      retcode = handleDone();
+      if (retcode == 1)
         return WORK_OK;
-
-      // Return EOF.
-      ex_queue_entry * up_entry = qparent_.up->getTailEntry();
-
-      up_entry->upState.parentIndex = pentry_down->downState.parentIndex;
-
-      up_entry->upState.setMatchNo(0);
-      up_entry->upState.status = ex_queue::Q_NO_DATA;
-
-      ComDiagsArea *diagsArea = up_entry->getDiagsArea();
-
-      if (diagsArea == NULL)
-        diagsArea = ComDiagsArea::allocate(getMyHeap());
-      else
-        diagsArea->incrRefCount(); // setDiagsArea call below will decr ref 
count
-
-      diagsArea->setRowCount(rowsAffected_);
-
-      if (getDiagsArea())
-        diagsArea->mergeAfter(*getDiagsArea());
-
-      up_entry->setDiagsArea(diagsArea);
-
-      // insert into parent
-      qparent_.up->insert();
+      masterGlob->setRowsAffected(rowsAffected_);
       step_ = INITIAL_;
-      qparent_.down->removeHead();
-
       freeResources();
       return WORK_OK;
     }
@@ -2514,36 +2384,9 @@ short ExExeUtilHBaseBulkUnLoadTcb::work()
 
     case UNLOAD_ERROR_:
     {
-      if (qparent_.up->isFull())
-        return WORK_OK;
-
-      // Return EOF.
-      ex_queue_entry * up_entry = qparent_.up->getTailEntry();
-
-      up_entry->upState.parentIndex = pentry_down->downState.parentIndex;
-
-      up_entry->upState.setMatchNo(0);
-      up_entry->upState.status = ex_queue::Q_SQLERROR;
-
-      ComDiagsArea *diagsArea = up_entry->getDiagsArea();
-
-      if (diagsArea == NULL)
-        diagsArea = ComDiagsArea::allocate(getMyHeap());
-      else
-        diagsArea->incrRefCount(); // setDiagsArea call below will decr ref 
count
-
-      if (getDiagsArea())
-        diagsArea->mergeAfter(*getDiagsArea());
-
-      up_entry->setDiagsArea(diagsArea);
-
-      // insert into parent
-      qparent_.up->insert();
-
-      pstate.matches_ = 0;
-
-
-
+      retcode = handleError();
+      if (retcode == 1)
+         return WORK_OK;
       step_ = DONE_;
     }
     break;
@@ -2552,7 +2395,6 @@ short ExExeUtilHBaseBulkUnLoadTcb::work()
   } // while
 
   return WORK_OK;
-
 }
 
 short ExExeUtilHBaseBulkUnLoadTcb::moveRowToUpQueue(const char * row, Lng32 
len,
@@ -3447,7 +3289,6 @@ short ExExeUtilLobExtractTcb::work()
            retcode = handleError();
            if (retcode == 1)
              return WORK_OK;
-
            step_ = DONE_;
          }
          break;
@@ -3457,7 +3298,6 @@ short ExExeUtilLobExtractTcb::work()
            retcode = handleDone();
            if (retcode == 1)
              return WORK_OK;
-
            step_ = EMPTY_;
            return WORK_OK;
          }

http://git-wip-us.apache.org/repos/asf/trafodion/blob/060bfc6c/core/sql/executor/ExHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp
index f77d714..d09f6cd 100644
--- a/core/sql/executor/ExHdfsScan.cpp
+++ b/core/sql/executor/ExHdfsScan.cpp
@@ -255,6 +255,8 @@ ExHdfsScanTcb::~ExHdfsScanTcb()
 
 void ExHdfsScanTcb::freeResources()
 {
+  if (hdfsScan_ != NULL)
+     hdfsScan_->stop();
   if (loggingFileName_ != NULL) {
      NADELETEBASIC(loggingFileName_, getHeap());
      loggingFileName_ = NULL;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/060bfc6c/core/sql/sqlcomp/nadefaults.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp
index 244a2bc..0d7b41b 100644
--- a/core/sql/sqlcomp/nadefaults.cpp
+++ b/core/sql/sqlcomp/nadefaults.cpp
@@ -3033,7 +3033,7 @@ XDDkwd__(SUBQUERY_UNNESTING,                      "ON"),
   // Use large queues on RHS of Flow/Nested Join when appropriate
   DDkwd__(USE_LARGE_QUEUES,                     "ON"),
 
-  DDkwd__(USE_LIBHDFS_SCAN,                     "ON"),
+  DDkwd__(USE_LIBHDFS_SCAN,                     "OFF"),
 
   DDkwd__(USE_MAINTAIN_CONTROL_TABLE,          "OFF"),
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/060bfc6c/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java 
b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index ff78d3d..0346bef 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -470,7 +470,7 @@ public class HDFSClient
    {
       if (future_ != null) {
          try {
-           future_.get(200, TimeUnit.MILLISECONDS);
+           future_.get(30, TimeUnit.SECONDS);
          } catch(TimeoutException e) {
             logger_.error("Asynchronous Thread of HdfsScan is Cancelled 
(timeout), ", e);
             future_.cancel(true);

Reply via email to