refactor the retry logic
Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/8ee00a80 Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/8ee00a80 Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/8ee00a80 Branch: refs/heads/master Commit: 8ee00a80db6a4129ab4d1c0968cb59fcdae671d0 Parents: 0d32288 Author: Liu Ming <ovis_p...@sina.com> Authored: Sun Jun 17 21:19:31 2018 -0400 Committer: Liu Ming <ovis_p...@sina.com> Committed: Sun Jun 17 21:19:31 2018 -0400 ---------------------------------------------------------------------- .../transactional/TransactionManager.java | 85 +++++++++++--------- 1 file changed, 49 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/8ee00a80/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java index 3705d62..1eceb71 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java @@ -3267,42 +3267,59 @@ public class TransactionManager { int retryCount = 0; int retrySleep = TM_SLEEP; boolean retry = false; + org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService.BlockingInterface service; + org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyRequest.Builder request ; try { - Table tbl = connection.getTable(TableName.valueOf(tblName)); - String rowkey = "0"; - CoprocessorRpcChannel channel = tbl.coprocessorService(rowkey.getBytes()); - org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService.BlockingInterface service = - org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService.newBlockingStub(channel); - org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyRequest.Builder request = - org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyRequest.newBuilder(); - String hbaseRoot = config.get("hbase.rootdir"); - FileSystem fs = FileSystem.get(config); - //Construct the HDFS dir - //find out if namespace is there - String[] parts = tblName.split(":"); - String namespacestr=""; - String fullPath = hbaseRoot + "/data/" ; - String fullPath2 = hbaseRoot + "/data/default/"; - if(fs.exists(new Path(fullPath2))) - fullPath = fullPath2; - - if(parts.length >1) //have namespace - fullPath = fullPath + parts[0] + "/" + parts[1]; - else - fullPath = fullPath + tblName; - - request.setPath(fullPath); - request.setPolicy(policy); + Table tbl = connection.getTable(TableName.valueOf(tblName)); + String rowkey = "0"; + CoprocessorRpcChannel channel = tbl.coprocessorService(rowkey.getBytes()); + service = + org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService.newBlockingStub(channel); + request = + org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyRequest.newBuilder(); + String hbaseRoot = config.get("hbase.rootdir"); + FileSystem fs = FileSystem.get(config); + //Construct the HDFS dir + //find out if namespace is there + String[] parts = tblName.split(":"); + String namespacestr=""; + String fullPath = hbaseRoot + "/data/" ; + String fullPath2 = hbaseRoot + "/data/default/"; + if(fs.exists(new Path(fullPath2))) + fullPath = fullPath2; + + if(parts.length >1) //have namespace + fullPath = fullPath + parts[0] + "/" + parts[1]; + else + fullPath = fullPath + tblName; + + request.setPath(fullPath); + request.setPolicy(policy); + } + catch (Exception e) { + throw new IOException(e); + } do { - org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyResponse ret = - service.setStoragePolicy(null,request.build()); - + org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyResponse ret = null; + try{ + ret = service.setStoragePolicy(null,request.build()); + } + catch(ServiceException se) { + String msg = new String ("ERROR occurred while calling coprocessor service in setStoragePolicy, retry due to "); + LOG.warn(msg, se); + retry = true; + } + catch(Throwable te) + { + LOG.error("ERROR occurred while calling coprocessor service in setStoragePolicy, not retry due to ", te); + retry = false; + } //handle result and error if( ret == null) { - retry = true; - LOG.error("setStoragePolicy Response ret null "); + retry = false; + LOG.error("setStoragePolicy Response ret null , not retry"); } else if (ret.getStatus() == false) { @@ -3315,11 +3332,7 @@ public class TransactionManager { } if (retry) retrySleep = retry(retrySleep); - } while (retry && retryCount++ < RETRY_ATTEMPTS); - } - catch (Exception e) { - throw new IOException(e); - } - } + } while (retry && retryCount++ < RETRY_ATTEMPTS); + } }