Repository: incubator-trafodion Updated Branches: refs/heads/master 84eeac7a2 -> 28bc2747e
[TRAFODION-2129] Trafodion to avoid use of deprecated HBase APIs/Classes The PR 629 for the above JIRA exposed the following issues. 1) A transaction remained in HUNGABORTED state when a table is created and dropped in the same transaction. core/TEST116 exhibited one such hung aborted transaction. 2) The retry logic was not throwing any exception when the allowed number of retry attempts are exhausted. 3) When a UnknownTransactionException is encountered in the response to co-processor request, the commit was get into endless loop. privs1/TEST133 was returning error 97 on commit transaction. All the above issues have been fixed. [TRAFODION-1988] Better java exception handling in the java/JNI layer of TM Cleaned up the exception logging in TM further and allowing more events to be logged via Common Logger facility. Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/98a0daa7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/98a0daa7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/98a0daa7 Branch: refs/heads/master Commit: 98a0daa7cfc68aef62014d88377eb2e3b4f957f3 Parents: 37901fb Author: selvaganesang <selva.govindara...@esgyn.com> Authored: Sat Aug 6 04:43:12 2016 +0000 Committer: selvaganesang <selva.govindara...@esgyn.com> Committed: Sat Aug 6 04:43:12 2016 +0000 ---------------------------------------------------------------------- .../transactional/TransactionManager.java | 520 ++++++++----------- core/sqf/src/tm/tmlogging.cpp | 138 +++-- core/sqf/src/tm/tmlogging.h | 2 - 3 files changed, 277 insertions(+), 383 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/98a0daa7/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java index 425802e..0b4888e 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java @@ -205,6 +205,15 @@ public class TransactionManager { return g_TransactionManager; } + public static int retry(int retrySleep) { + try { + Thread.sleep(retrySleep); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } + return (retrySleep += TM_SLEEP_INCR); + } + /* increment/deincrement for positive value */ /* This method copied from o.a.h.h.utils.Bytes */ public static byte [] binaryIncrementPos(byte [] value, long amount) { @@ -447,20 +456,9 @@ public class TransactionManager { if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- setting retry, count: " + retryCount); refresh = false; } - - retryCount++; - - if (retryCount < RETRY_ATTEMPTS && retry == true) { - try { - Thread.sleep(retrySleep); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - retrySleep += TM_SLEEP_INCR; - } - - } while (retryCount < RETRY_ATTEMPTS && retry == true); + if (retry) + retrySleep = retry(retrySleep); + } while (retry && retryCount++ <= RETRY_ATTEMPTS); } if( TRANSACTION_ALGORITHM == AlgorithmType.SSCC){ @@ -497,12 +495,12 @@ public class TransactionManager { } catch (ServiceException se) { String msg = "ERROR occurred while calling doCommitX coprocessor service in doCommitX"; LOG.error(msg + ":", se); - transactionState.requestPendingCountDec(true); - throw new DoNotRetryIOException(msg, se); + throw new RetryTransactionException(msg,se); } catch (Throwable e) { String msg = "ERROR occurred while calling doCommitX coprocessor service in doCommitX"; LOG.error(msg + ":", e); - throw new RetryTransactionException(msg, e); + transactionState.requestPendingCountDec(true); + throw new DoNotRetryIOException(msg, e); } if(result.size() != 1) { LOG.error("doCommitX, received incorrect result size: " + result.size() + " txid: " + transactionId); @@ -540,7 +538,7 @@ public class TransactionManager { throw ute; } catch (RetryTransactionException rte) { - if(retryCount == RETRY_ATTEMPTS) { + if (retryCount == RETRY_ATTEMPTS) { LOG.error("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId, rte); // We have received our reply in the form of an exception, // so decrement outstanding count and wake up waiters to avoid @@ -569,20 +567,10 @@ public class TransactionManager { if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- setting retry, count: " + retryCount); refresh = false; } + if (retry) + retrySleep = retry(retrySleep); + } while (retry && retryCount++ <= RETRY_ATTEMPTS); - retryCount++; - - if (retryCount < RETRY_ATTEMPTS && retry == true) { - try { - Thread.sleep(retrySleep); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - retrySleep += TM_SLEEP_INCR; - } - - } while (retryCount < RETRY_ATTEMPTS && retry == true); } // We have received our reply so decrement outstanding count transactionState.requestPendingCountDec(false); @@ -644,6 +632,7 @@ public class TransactionManager { } catch (Throwable e) { String errMsg = new String("doPrepareX coprocessor error for " + Bytes.toString(regionName) + " txid: " + transactionId + ":"); LOG.error(errMsg, e); + transactionState.requestPendingCountDec(true); throw new CommitUnsuccessfulException("Unable to call prepare, coprocessor error", e); } @@ -686,12 +675,6 @@ public class TransactionManager { retry = false; } else { - try { - // Pause for split to complete and retry - Thread.sleep(100); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } retry = true; } } @@ -760,20 +743,12 @@ public class TransactionManager { if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- setting retry, count: " + retryCount); refresh = false; } + else // Retry immediately if refresh is done + if (retry) + retrySleep = retry(retrySleep); + transactionState.setRetried(true); + } while (retry && retryCount++ <= RETRY_ATTEMPTS); - retryCount++; - - if (retryCount < RETRY_ATTEMPTS && retry == true) { - try { - Thread.sleep(retrySleep); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - retrySleep += TM_SLEEP_INCR; - } - - } while (retryCount < RETRY_ATTEMPTS && retry == true); } if( TRANSACTION_ALGORITHM == AlgorithmType.SSCC){ do { @@ -853,20 +828,10 @@ public class TransactionManager { if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- setting retry, count: " + retryCount); refresh = false; } + if (retry) + retrySleep = retry(retrySleep); + } while (retry && retryCount++ <= RETRY_ATTEMPTS); - retryCount++; - - if (retryCount < RETRY_ATTEMPTS && retry == true) { - try { - Thread.sleep(retrySleep); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - retrySleep += TM_SLEEP_INCR; - } - - } while (retryCount < RETRY_ATTEMPTS && retry == true); } if (LOG.isTraceEnabled()) LOG.trace("commitStatus for transId(" + transactionId + "): " + commitStatus + " TableName " + table.toString() @@ -999,17 +964,17 @@ public class TransactionManager { throw ute; } catch (RetryTransactionException rte) { - if (rte.toString().contains("Asked to commit a non-pending transaction ")) { - LOG.error(" doCommitX will not retry transaction: " + transactionId , rte); - refresh = false; - retry = false; - } if (retryCount == RETRY_ATTEMPTS) { String errMsg = "Exceeded retry attempts in doAbortX: " + retryCount; LOG.error(errMsg, rte); transactionState.requestPendingCountDec(true); throw new DoNotRetryIOException(errMsg, rte); } + else if (rte.toString().contains("Asked to commit a non-pending transaction ")) { + LOG.error(" doCommitX will not retry transaction: " + transactionId , rte); + refresh = false; + retry = false; + } else { LOG.error("doAbortX retrying transaction: " + transactionId + " participantNum: " + participantNum + " region: " + Bytes.toString(regionName) + " due to Exception: " ,rte ); @@ -1029,21 +994,11 @@ public class TransactionManager { table.getRegionLocation(startKey, true); if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- setting retry, count: " + retryCount); refresh = false; - } - - retryCount++; - - if (retryCount < RETRY_ATTEMPTS && retry == true) { - try { - Thread.sleep(retrySleep); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - retrySleep += TM_SLEEP_INCR; - } + } + if (retry) + retrySleep = retry(retrySleep); + } while (retry && retryCount++ <= RETRY_ATTEMPTS); - } while (retryCount < RETRY_ATTEMPTS && retry == true); } if( TRANSACTION_ALGORITHM == AlgorithmType.SSCC){ do { @@ -1074,7 +1029,7 @@ public class TransactionManager { } catch (ServiceException se) { String msg = "ERROR occurred while calling doAbortX coprocessor service"; LOG.error(msg + ":", se); - throw new DoNotRetryIOException(msg, se); + throw new RetryTransactionException(msg, se); } catch (Throwable e) { String msg = "ERROR occurred while calling doAbortX coprocessor service"; LOG.error(msg + ":", e); @@ -1109,17 +1064,17 @@ public class TransactionManager { throw ute; } catch (RetryTransactionException rte) { - if (retryCount == RETRY_ATTEMPTS){ + if (retryCount == RETRY_ATTEMPTS){ String errMsg = new String ("Exceeded retry attempts in doAbortX: " + retryCount + " (Not ingoring)"); LOG.error(errMsg); transactionState.requestPendingCountDec(true); throw new RollbackUnsuccessfulException(errMsg, rte); - } - LOG.error("doAbortX participant " + participantNum + " retrying transaction " - + transactionId + " due to Exception: " + rte); - refresh = true; - retry = true; } + LOG.error("doAbortX participant " + participantNum + " retrying transaction " + + transactionId + " due to Exception: " + rte); + refresh = true; + retry = true; + } if (refresh) { HRegionLocation lv_hrl = table.getRegionLocation(startKey); @@ -1134,20 +1089,10 @@ public class TransactionManager { if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- setting retry, count: " + retryCount); refresh = false; } + if (retry) + retrySleep = retry(retrySleep); + } while (retry && retryCount++ <= RETRY_ATTEMPTS); - retryCount++; - - if (retryCount < RETRY_ATTEMPTS && retry == true) { - try { - Thread.sleep(retrySleep); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - - retrySleep += TM_SLEEP_INCR; - } - - } while (retryCount < RETRY_ATTEMPTS && retry == true); } admin.close(); // We have received our reply so decrement outstanding count @@ -1189,101 +1134,103 @@ public class TransactionManager { TrxRegionService.BlockingInterface trxService = TrxRegionService.newBlockingStub(channel); commitMultipleResponse = trxService.commitMultiple(null, commitMultipleRequest); retry = false; - } catch (ServiceException se) { - String errMsg = "doCommitX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId; - LOG.error(errMsg, se); - refresh = true; - retry = true; - } catch (Throwable e) { - String errMsg = "doCommitX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId; - LOG.error(errMsg,e); - throw new CommitUnsuccessfulException(errMsg, e); - } - if(!retry) { - List<String> exceptions = commitMultipleResponse.getExceptionList(); - - checkException(transactionState, locations, exceptions); - if(transactionState.getRegionsRetryCount() > 0) { - retryCommit(transactionState, true); - } - } - } - catch (RetryTransactionException rte) { - if(retryCount == RETRY_ATTEMPTS) { - LOG.error("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId, rte); - transactionState.requestPendingCountDec(true); - throw new CommitUnsuccessfulException("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId, - rte); - } - LOG.error("doCommitX retrying transaction " + transactionId - + " participant " + participantNum + " due to Exception: ", rte); - refresh = true; - retry = true; - } - if (refresh) { - - HRegionLocation lv_hrl = table.getRegionLocation(startKey); - HRegionInfo lv_hri = lv_hrl.getRegionInfo(); - - if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: " - + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId); - - if (LOG.isWarnEnabled()) { - LOG.warn("doCommitX -- " + table.toString() + " location being refreshed"); - LOG.warn("doCommitX -- lv_hri: " + lv_hri); - LOG.warn("doCommitX -- location.getRegionInfo(): " + location.getRegionInfo()); - } - table.getRegionLocation(startKey, true); - - if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- setting retry, count: " + retryCount); - refresh = false; - retryCount++; - } - } while (retryCount < RETRY_ATTEMPTS && retry == true); - - transactionState.requestPendingCountDec(false); - - if (LOG.isTraceEnabled()) LOG.trace("doCommitX - Batch -- EXIT txid: " + transactionId); - return 0; - } - - public Integer doPrepareX(final List<TransactionRegionLocation> locations, final long transactionId, final int participantNum) - throws IOException, CommitUnsuccessfulException { - if (LOG.isTraceEnabled()) LOG.trace("doPrepareX - Batch -- ENTRY txid: " + transactionId - + " participant " + participantNum ); - - boolean refresh = false; - boolean retry = false; - int retryCount = 0; - List<Integer> results = null; - do { - try { - - TrxRegionProtos.CommitRequestMultipleRequest.Builder builder = CommitRequestMultipleRequest.newBuilder(); - builder.setTransactionId(transactionId); - builder.setParticipantNum(participantNum); - for(TransactionRegionLocation location : locations) { - builder.addRegionName(ByteString.copyFrom(location.getRegionInfo().getRegionName())); - } - TrxRegionProtos.CommitRequestMultipleRequest commitMultipleRequest = builder.build(); - TrxRegionProtos.CommitRequestMultipleResponse commitMultipleResponse = null; - - try { - CoprocessorRpcChannel channel = table.coprocessorService(startKey); - TrxRegionService.BlockingInterface trxService = TrxRegionService.newBlockingStub(channel); - commitMultipleResponse = trxService.commitRequestMultiple(null, commitMultipleRequest); - retry = false; - } catch (ServiceException se) { - String errMsg = new String("doPrepareX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId ); - LOG.error(errMsg, se); - refresh = true; - retry = true; - } catch (Throwable e) { - String errMsg = "doPrepareX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId; - LOG.error(errMsg, e); - throw new CommitUnsuccessfulException("Unable to call prepare, coprocessor error", e); - } - if(!retry) { + } catch (ServiceException se) { + String msg = "doCommitX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId; + LOG.error(msg, se); + refresh = true; + throw new RetryTransactionException(msg, se); + } catch (Throwable e) { + String errMsg = "doCommitX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId; + LOG.error(errMsg,e); + transactionState.requestPendingCountDec(true); + throw new CommitUnsuccessfulException(errMsg, e); + } + if(!retry) { + List<String> exceptions = commitMultipleResponse.getExceptionList(); + + checkException(transactionState, locations, exceptions); + if(transactionState.getRegionsRetryCount() > 0) { + retryCommit(transactionState, true); + } + } + } + catch (RetryTransactionException rte) { + if(retryCount == RETRY_ATTEMPTS) { + LOG.error("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId, rte); + transactionState.requestPendingCountDec(true); + throw new CommitUnsuccessfulException("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId, + rte); + } + LOG.error("doCommitX retrying transaction " + transactionId + + " participant " + participantNum + " due to Exception: ", rte); + refresh = true; + retry = true; + } + if (refresh) { + + HRegionLocation lv_hrl = table.getRegionLocation(startKey); + HRegionInfo lv_hri = lv_hrl.getRegionInfo(); + + if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: " + + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId); + + if (LOG.isWarnEnabled()) { + LOG.warn("doCommitX -- " + table.toString() + " location being refreshed"); + LOG.warn("doCommitX -- lv_hri: " + lv_hri); + LOG.warn("doCommitX -- location.getRegionInfo(): " + location.getRegionInfo()); + } + table.getRegionLocation(startKey, true); + + if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- setting retry, count: " + retryCount); + refresh = false; + } + } while (retry && retryCount++ <= RETRY_ATTEMPTS); + + + transactionState.requestPendingCountDec(false); + + if (LOG.isTraceEnabled()) LOG.trace("doCommitX - Batch -- EXIT txid: " + transactionId); + return 0; + } + + public Integer doPrepareX(final List<TransactionRegionLocation> locations, final long transactionId, final int participantNum) + throws IOException, CommitUnsuccessfulException { + if (LOG.isTraceEnabled()) LOG.trace("doPrepareX - Batch -- ENTRY txid: " + transactionId + + " participant " + participantNum ); + + boolean refresh = false; + boolean retry = false; + int retryCount = 0; + List<Integer> results = null; + do { + try { + + TrxRegionProtos.CommitRequestMultipleRequest.Builder builder = CommitRequestMultipleRequest.newBuilder(); + builder.setTransactionId(transactionId); + builder.setParticipantNum(participantNum); + for(TransactionRegionLocation location : locations) { + builder.addRegionName(ByteString.copyFrom(location.getRegionInfo().getRegionName())); + } + TrxRegionProtos.CommitRequestMultipleRequest commitMultipleRequest = builder.build(); + TrxRegionProtos.CommitRequestMultipleResponse commitMultipleResponse = null; + + try { + CoprocessorRpcChannel channel = table.coprocessorService(startKey); + TrxRegionService.BlockingInterface trxService = TrxRegionService.newBlockingStub(channel); + commitMultipleResponse = trxService.commitRequestMultiple(null, commitMultipleRequest); + retry = false; + } catch (ServiceException se) { + String errMsg = new String("doPrepareX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId ); + LOG.error(errMsg, se); + refresh = true; + throw new RetryTransactionException(errMsg, se); + } catch (Throwable e) { + String errMsg = "doPrepareX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId; + LOG.error(errMsg, e); + transactionState.requestPendingCountDec(true); + throw new CommitUnsuccessfulException("Unable to call prepare, coprocessor error", e); + } + if(!retry) { results = commitMultipleResponse.getResultList(); //commitStatus = value; List<String> exceptions = commitMultipleResponse.getExceptionList(); @@ -1321,9 +1268,9 @@ public class TransactionManager { if (LOG.isDebugEnabled()) LOG.debug("doPrepareX -Batch- retry count: " + retryCount); if (LOG.isTraceEnabled()) LOG.trace("doPrepareX --Batch-- setting retry, count: " + retryCount); refresh = false; - retryCount++; } - } while (retryCount < RETRY_ATTEMPTS && retry == true); + } while (retry && retryCount++ <= RETRY_ATTEMPTS); + // Process the results of the list here @@ -1406,12 +1353,14 @@ public class TransactionManager { abortTransactionMultipleResponse = trxService.abortTransactionMultiple(null, abortTransactionMultipleRequest); retry = false; } catch (ServiceException se) { - LOG.error("doAbortX - Batch - coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId + ":",se); + String errMsg = "doAbortX - Batch - coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId; refresh = true; - retry = true; + LOG.error(errMsg, se); + throw new RetryTransactionException(errMsg, se); } catch (Throwable e) { String errMsg = "doAbortX - Batch - coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId; LOG.error(errMsg, e); + transactionState.requestPendingCountDec(true); throw new RollbackUnsuccessfulException("doAbortX, Batch - coprocessor error", e); } if(!retry) { @@ -1451,9 +1400,9 @@ public class TransactionManager { if (LOG.isTraceEnabled()) LOG.trace("doAbortX - Batch - -- setting retry, count: " + retryCount); refresh = false; - retryCount++; } - } while (retryCount < RETRY_ATTEMPTS && retry == true); + } while (retry && retryCount++ <= RETRY_ATTEMPTS); + transactionState.requestPendingCountDec(false); if(LOG.isTraceEnabled()) LOG.trace("doAbortX - Batch -- EXIT txID: " + transactionId); @@ -2231,7 +2180,7 @@ public class TransactionManager { retry = false; } catch(IOException e){ - LOG.error("Exception in doCommitDDL, Step: getRow. txID: " + transactionState.getTransactionId() + "Exception: " + e); + LOG.info("Exception in doCommitDDL, Step: getRow. txID: " + transactionState.getTransactionId() + "Exception: " , e); if(retryCount == RETRY_ATTEMPTS) { @@ -2240,19 +2189,11 @@ public class TransactionManager { //if tmDDL is unreachable at this point, it is fatal. throw new UnsuccessfulDDLException(e); } + if (retry) + retrySleep = retry(retrySleep); + } + } while (retry && retryCount++ <= RETRY_ATTEMPTS); - retryCount++; - if (retryCount < RETRY_ATTEMPTS) - { - try { - Thread.sleep(retrySleep); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - retrySleep += TM_SLEEP_INCR; - } - } - } while (retryCount < RETRY_ATTEMPTS && retry == true); if (state.toString().equals("VALID") && dropList.size() > 0) { @@ -2270,15 +2211,15 @@ public class TransactionManager { deleteTable(transactionState, tblName); retry = false; } - catch(TableNotFoundException t){ + catch (TableNotFoundException t) { //Check for TableNotFoundException, if that is the case, no further //processing needed. This is not an error. Possible we are retrying the entire set of DDL changes - //because this transaction was pinned for some reason. - if(LOG.isTraceEnabled()) LOG.trace(" TableNotFoundException exception in doCommitDDL deleteTable, Continuing: txID: " + transactionState.getTransactionId()); + //because tis transaction was pinned for some reason. + LOG.info(" Exception for " + tblName + ", but continuing txID: " + transactionState.getTransactionId(), t); retry = false; } - catch(IOException e){ - LOG.error("Fatal exception in doCommitDDL, Step : DeleteTable: TxID:" + transactionState.getTransactionId() + "Exception: " + e); + catch (IOException e) { + LOG.info("Fatal exception in doCommitDDL, Step : DeleteTable: TxID:" + transactionState.getTransactionId() + "Exception: " , e); if(retryCount == RETRY_ATTEMPTS) { @@ -2288,19 +2229,10 @@ public class TransactionManager { //Throwing a new exception gets out of the loop. throw new UnsuccessfulDDLException(e); } - - retryCount++; - if (retryCount < RETRY_ATTEMPTS) - { - try { - Thread.sleep(retrySleep); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - retrySleep += TM_SLEEP_INCR; - } + if (retry) + retrySleep = retry(retrySleep); } - }while (retryCount < RETRY_ATTEMPTS && retry == true); + } while (retry == true); }//while } @@ -2316,7 +2248,7 @@ public class TransactionManager { } catch (IOException e) { - LOG.error("Fatal Exception in doCommitDDL, Step: deleteRow. txID: " + transactionState.getTransactionId() + "Exception: " + e); + LOG.info("Fatal Exception in doCommitDDL, Step: deleteRow. txID: " + transactionState.getTransactionId() + "Exception: " , e); if(retryCount == RETRY_ATTEMPTS) { @@ -2327,21 +2259,12 @@ public class TransactionManager { throw new UnsuccessfulDDLException(e); } - retryCount++; - if (retryCount < RETRY_ATTEMPTS) - { - try { - Thread.sleep(retrySleep); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - retrySleep += TM_SLEEP_INCR; - } + if (retry) + retrySleep = retry(retrySleep); } - }while (retryCount < RETRY_ATTEMPTS && retry == true); + } while (retry == true); if (LOG.isTraceEnabled()) LOG.trace("doCommitDDL EXIT [" + transactionState.getTransactionId() + "]"); - } @@ -2464,7 +2387,7 @@ public class TransactionManager { retry = false; } catch (IOException e){ - LOG.error("Fatal Exception in abortDDL, Step: getRow. txID: " + transactionState.getTransactionId() + "Exception: " + e); + LOG.info("Fatal Exception in abortDDL, Step: getRow. txID: " + transactionState.getTransactionId() + "Exception: " , e); if(retryCount == RETRY_ATTEMPTS) { @@ -2474,18 +2397,11 @@ public class TransactionManager { throw new UnsuccessfulDDLException(e); } - retryCount++; - if (retryCount < RETRY_ATTEMPTS) - { - try { - Thread.sleep(retrySleep); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - retrySleep += TM_SLEEP_INCR; - } + if (retry) + retrySleep = retry(retrySleep); } - } while (retryCount < RETRY_ATTEMPTS && retry == true); + } while (retry && retryCount++ <= RETRY_ATTEMPTS); + // if tables were recorded to be truncated on an upsert using load, // then they will be truncated on an abort transaction @@ -2507,7 +2423,7 @@ public class TransactionManager { retry = false; } catch (IOException e){ - LOG.error("Fatal exception in abortDDL, Step : truncateTable: TxID:" + transactionState.getTransactionId() + "Exception: " + e); + LOG.info("Fatal exception in abortDDL, Step : truncateTable: TxID:" + transactionState.getTransactionId() + "Exception: ", e); if(retryCount == RETRY_ATTEMPTS) { @@ -2517,19 +2433,10 @@ public class TransactionManager { //Throwing a new exception gets out of the loop. throw new UnsuccessfulDDLException(e); } - - retryCount++; - if (retryCount < RETRY_ATTEMPTS) - { - try { - Thread.sleep(retrySleep); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - retrySleep += TM_SLEEP_INCR; - } + if (retry) + retrySleep = retry(retrySleep); } - }while(retryCount < RETRY_ATTEMPTS && retry == true); + } while(retry); }//while } @@ -2548,15 +2455,15 @@ public class TransactionManager { deleteTable(transactionState, tblName); retry = false; } - catch(TableNotFoundException t){ + catch (TableNotFoundException t) { //Check for TableNotFoundException, if that is the case, no further //processing needed. This is not an error. Possible we are retrying the entire set of DDL changes //because this transaction is being redriven for some reason. - if(LOG.isTraceEnabled()) LOG.trace(" TableNotFoundException exception in abortDDL deleteTable, Continuing: txID: " + transactionState.getTransactionId()); + LOG.info(" Exception for " + tblName + ", but continuing txID: " + transactionState.getTransactionId(), t); retry = false; } - catch (IOException e){ - LOG.error("Fatal exception in abortDDL, Step : DeleteTable: TxID:" + transactionState.getTransactionId() + "Exception: " + e); + catch (IOException e) { + LOG.info("Fatal exception in abortDDL, Step : DeleteTable: TxID:" + transactionState.getTransactionId() + "Exception: " , e); if(retryCount == RETRY_ATTEMPTS) { @@ -2567,19 +2474,10 @@ public class TransactionManager { throw new UnsuccessfulDDLException(e); } - retryCount++; - if (retryCount < RETRY_ATTEMPTS) - { - try { - Thread.sleep(retrySleep); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - retrySleep += TM_SLEEP_INCR; - } - + if (retry) + retrySleep = retry(retrySleep); } - }while(retryCount < RETRY_ATTEMPTS && retry == true); + } while(retry); }//while } @@ -2595,7 +2493,34 @@ public class TransactionManager { retrySleep = TM_SLEEP; retry = true; String tblName = di.next(); - enableTable(transactionState, tblName); + do + { + try { + enableTable(transactionState, tblName); + retry = false; + } + catch (TableNotFoundException t) { + //Check for TableNotFoundException, if that is the case, no further + //processing needed. This would happen if the table is created and dropped in the same transaction + LOG.info(" Exception for " + tblName + ", but continuing txID: " + transactionState.getTransactionId(), t); + retry = false; + } + catch (IOException e) { + LOG.info("Fatal exception in abortDDL, Step : enableTable: TxID:" + transactionState.getTransactionId() + "Exception: ", e); + if(retryCount == RETRY_ATTEMPTS) + { + LOG.error("Fatal Exception in abortDDL, Step: enableTable. Raising UnsuccessfulDDLException TxID:" + transactionState.getTransactionId() ); + + //Throw this exception after all retry attempts. + //Throwing a new exception gets out of the loop. + throw new UnsuccessfulDDLException(e); + } + + if (retry) + retrySleep = retry(retrySleep); + } + } while (retry && retryCount++ <= RETRY_ATTEMPTS); + }//while } @@ -2611,8 +2536,7 @@ public class TransactionManager { } catch (IOException e) { - LOG.error("Fatal Exception in abortDDL, Step: deleteRow. txID: " + transactionState.getTransactionId() + "Exception: " + e); - + LOG.info("Fatal Exception in abortDDL, Step: deleteRow. txID: " + transactionState.getTransactionId() + "Exception: ", e); if(retryCount == RETRY_ATTEMPTS) { LOG.error("Fatal Exception in abortDDL, Step: deleteRow. Raising UnsuccessfulDDLException. txID: " + transactionState.getTransactionId()); @@ -2620,20 +2544,10 @@ public class TransactionManager { //Throwing a new exception gets out of the loop. throw new UnsuccessfulDDLException(e); } - - retryCount++; - if (retryCount < RETRY_ATTEMPTS) - { - try { - Thread.sleep(retrySleep); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - retrySleep += TM_SLEEP_INCR; - } - + if (retry) + retrySleep = retry(retrySleep); } - }while(retryCount < RETRY_ATTEMPTS && retry == true); + } while(retry); } public synchronized JtaXAResource getXAResource() { http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/98a0daa7/core/sqf/src/tm/tmlogging.cpp ---------------------------------------------------------------------- diff --git a/core/sqf/src/tm/tmlogging.cpp b/core/sqf/src/tm/tmlogging.cpp index 0dca85e..d3ff409 100644 --- a/core/sqf/src/tm/tmlogging.cpp +++ b/core/sqf/src/tm/tmlogging.cpp @@ -61,64 +61,6 @@ int tm_log_write(int pv_event_type, posix_sqlog_severity_t pv_severity, char *er return 0; } -int tm_alt_log_write(int eventType, posix_sqlog_severity_t severity, char *msg) { - static int logFileType = SBX_LOG_TYPE_LOGFILE; - static char startTimeFmt[20] = ""; - - char logFileDir[PATH_MAX]; - char *logFileDirPtr; - char logFilePrefix[25]; - char *rootDir; - - struct timeval startTime; - struct tm * ltime; - - if ((logFileType&SBX_LOG_TYPE_LOGFILE_PERSIST) != SBX_LOG_TYPE_LOGFILE_PERSIST) - { - // getting date time for log file name - gettimeofday(&startTime, NULL); - ltime = localtime(&startTime.tv_sec); - sprintf(startTimeFmt, "%02d%02d%02d.%02d.%02d.%02d", ltime->tm_mon+1, ltime->tm_mday, ltime->tm_year-100, ltime->tm_hour, ltime->tm_min, ltime->tm_sec); - } - - // directory to write log file - rootDir = getenv("MY_SQROOT"); - if (rootDir == NULL) - { - logFileDirPtr = NULL; - } - else - { - sprintf(logFileDir, "%s/logs", rootDir); - logFileDirPtr = logFileDir; - } - - // log file prefix will be tm.<date>.hh.mm.ss - sprintf(logFilePrefix, "tm.%s", (char *)&startTimeFmt); - - SBX_log_write(logFileType, // log_type - logFileDirPtr, // log_file_dir - logFilePrefix, // log_file_prefix - SQEVL_DTM, // component id - eventType, // event id - SQ_LOG_SEAQUEST, // facility - severity, // severity - "TM", // name - NULL, // msg_prefix - msg, // msg - NULL, // snmptrap_cmd - NULL, // msg_snmptrap - NULL, // msg_ret - 0); // msg_ret size - - // write to the same file in future without opening and closing it. - if (logFileType == SBX_LOG_TYPE_LOGFILE) - { - logFileType |= SBX_LOG_TYPE_LOGFILE_PERSIST; - } - return 0; -} - int tm_log_event(int event_id, posix_sqlog_severity_t severity, const char *temp_string, @@ -147,16 +89,57 @@ int tm_log_event(int event_id, { int rc = 0; + char la_buf[DTM_STRING_BUF_SIZE]; + if (gv_dual_logging) { - char la_buf[DTM_STRING_BUF_SIZE]; - strncpy (la_buf, temp_string, DTM_STRING_BUF_SIZE - 1); - tm_log_stdout(event_id, severity, la_buf, error_code, -1, rmid, dtmid, seq_num, msgid, xa_error, + tm_log_stdout(event_id, severity, temp_string, error_code, -1, rmid, dtmid, seq_num, msgid, xa_error, pool_size, pool_elems, msg_retries, pool_high, pool_low, pool_max, tx_state, data, data1, data2, string1, node, msgid2, offset, tm_event_msg, data4); } - - return rc; + char my_processName[MS_MON_MAX_PROCESS_NAME+1]; + int my_nid,my_pid; + logLevel ll_severity = LL_INFO; + getTMLoggingHeaderInfo(severity, ll_severity, my_processName, sizeof(my_processName), my_nid, my_pid); + la_buf[0] = '\0'; + if (msgid != -1) + sprintf(la_buf, ", msgid=%d",msgid); + if (xa_error != -1) + sprintf(la_buf+strlen(la_buf), ", XAERR=%d",xa_error); + if (pool_size != -1) + sprintf(la_buf+strlen(la_buf), ", pool_size=%d",pool_size); + if (pool_elems != -1) + sprintf(la_buf+strlen(la_buf), ", elements in pool=%d",pool_elems); + if (msg_retries != -1) + sprintf(la_buf+strlen(la_buf), ", msg retries=%d",msg_retries); + if (pool_high != -1) + sprintf(la_buf+strlen(la_buf), ", pool_high_ss=%d",pool_high); + if (pool_low != -1) + sprintf(la_buf+strlen(la_buf), ", pool_low_ss=%d",pool_low); + if (pool_max != -1) + sprintf(la_buf+strlen(la_buf), ", pool_max_size=%d",pool_max); + if (tx_state != -1) + sprintf(la_buf+strlen(la_buf), ", Txn State=%d",tx_state); + if (data != -1) + sprintf(la_buf+strlen(la_buf), ", data=%d",data); + if (data1 != -1) + sprintf(la_buf+strlen(la_buf), ", data1=%d",data1); + if (data2 != -1) + sprintf(la_buf+strlen(la_buf), ", data2=" PFLL,data2); + if (node != -1) + sprintf(la_buf+strlen(la_buf), ", node=%d",node); + if (msgid2 != -1) + sprintf(la_buf+strlen(la_buf), ", msgid2=%d",msgid2); + if (offset != -1) + sprintf(la_buf+strlen(la_buf), ", offset=%d",offset); + if (tm_event_msg != -1) + sprintf(la_buf+strlen(la_buf), ", tm_event_msg=%d",tm_event_msg); + if (data4 != 0) + sprintf(la_buf+strlen(la_buf), ", data4=%u",data4); + + CommonLogger::log(TM_COMPONENT, ll_severity, "Node: %d Pid: %d Name: %s TransId: %d,%d,%d Event: %d Message: %s %s %s", + my_nid, my_pid, my_processName, rmid, dtmid, seq_num, event_id, temp_string, (string1 == NULL ? "" : string1), la_buf); + return rc; } @@ -209,24 +192,23 @@ int tm_log_stdout(int event_id, switch (severity) { case SQ_LOG_EMERG: printf("EMERGENCY"); - break; + break; case SQ_LOG_ALERT: printf("ALERT"); - break; + break; case SQ_LOG_CRIT: printf("CRITICAL"); - break; + break; case SQ_LOG_ERR: printf("ERROR"); - break; + break; case SQ_LOG_WARNING: printf("WARNING"); - break; + break; case SQ_LOG_NOTICE: printf("NOTICE"); - break; + break; case SQ_LOG_INFO: printf("INFO"); - break; + break; case SQ_LOG_DEBUG: printf("DEBUG"); - break; + break; default: printf("%d Unknown", severity); } - printf(", "); if (error_code != -1) printf(", Error=%d",error_code); @@ -307,22 +289,22 @@ void getTMLoggingHeaderInfo(posix_sqlog_severity_t severity, logLevel &ll_severi case SQ_LOG_ALERT: ll_severity = LL_WARN; break; - case SQ_LOG_CRIT: printf("CRITICAL"); + case SQ_LOG_CRIT: ll_severity = LL_FATAL; break; - case SQ_LOG_ERR: printf("ERROR"); + case SQ_LOG_ERR: ll_severity = LL_ERROR; break; - case SQ_LOG_WARNING: printf("WARNING"); + case SQ_LOG_WARNING: ll_severity = LL_WARN; break; - case SQ_LOG_NOTICE: printf("NOTICE"); + case SQ_LOG_NOTICE: ll_severity = LL_INFO; break; - case SQ_LOG_INFO: printf("INFO"); + case SQ_LOG_INFO: ll_severity = LL_INFO; break; - case SQ_LOG_DEBUG: printf("DEBUG"); + case SQ_LOG_DEBUG: ll_severity = LL_DEBUG; break; default: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/98a0daa7/core/sqf/src/tm/tmlogging.h ---------------------------------------------------------------------- diff --git a/core/sqf/src/tm/tmlogging.h b/core/sqf/src/tm/tmlogging.h index f763a0f..c261ef6 100644 --- a/core/sqf/src/tm/tmlogging.h +++ b/core/sqf/src/tm/tmlogging.h @@ -34,8 +34,6 @@ int tm_init_logging(); int tm_log_write(int pv_event_type, posix_sqlog_severity_t pv_severity, char *err_string, char *exception_stack=NULL, long transid=-1); -int tm_alt_log_write(int eventType, posix_sqlog_severity_t severity, char *msg); - int tm_log_event(int event_type, posix_sqlog_severity_t severity, const char *temp_string,