add retry logic
Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/335e9e8c Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/335e9e8c Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/335e9e8c Branch: refs/heads/master Commit: 335e9e8c85c6591739eaf026e0e043498bce145e Parents: ae17bf1 Author: Liu Ming <ovis_p...@sina.com> Authored: Sat Jun 2 12:30:19 2018 -0400 Committer: Liu Ming <ovis_p...@sina.com> Committed: Sat Jun 2 12:30:19 2018 -0400 ---------------------------------------------------------------------- .../transactional/TransactionManager.java | 82 +++++++++++--------- 1 file changed, 45 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/335e9e8c/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java index aeb86ce..9dd9643 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java @@ -3264,53 +3264,61 @@ public class TransactionManager { public void setStoragePolicy(String tblName, String policy) throws IOException { - try{ + int retryCount = 0; + int retrySleep = TM_SLEEP; + boolean retry = false; + try { Table tbl = connection.getTable(TableName.valueOf(tblName)); String rowkey = "0"; CoprocessorRpcChannel channel = tbl.coprocessorService(rowkey.getBytes()); org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService.BlockingInterface service = - org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService.newBlockingStub(channel); + org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService.newBlockingStub(channel); org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyRequest.Builder request = org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyRequest.newBuilder(); String hbaseRoot = config.get("hbase.rootdir"); FileSystem fs = FileSystem.get(config); - - //Construct the HDFS dir - //find out if namespace is there - String[] parts = tblName.split(":"); - String namespacestr=""; - String fullPath = hbaseRoot + "/data/" ; - String fullPath2 = hbaseRoot + "/data/default/"; - if(fs.exists(new Path(fullPath2))) - fullPath = fullPath2; - - if(parts.length >1) //have namespace - fullPath = fullPath + parts[0] + "/" + parts[1]; - else - fullPath = fullPath + tblName; - - request.setPath(fullPath); - request.setPolicy(policy); - - org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyResponse ret = - service.setStoragePolicy(null,request.build()); - - //handle result and error - if( ret == null) - { - LOG.error("setStoragePolicy Response ret null "); - throw new IOException("coprocessor not response"); - } - else if (ret.getStatus() == false) - { - LOG.error("setStoragePolicy Response ret false: " + ret.getException()); - throw new IOException(ret.getException()); - } + //Construct the HDFS dir + //find out if namespace is there + String[] parts = tblName.split(":"); + String namespacestr=""; + String fullPath = hbaseRoot + "/data/" ; + String fullPath2 = hbaseRoot + "/data/default/"; + if(fs.exists(new Path(fullPath2))) + fullPath = fullPath2; + + if(parts.length >1) //have namespace + fullPath = fullPath + parts[0] + "/" + parts[1]; + else + fullPath = fullPath + tblName; + + request.setPath(fullPath); + request.setPolicy(policy); + + do { + org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrafSetStoragePolicyResponse ret = + service.setStoragePolicy(null,request.build()); + + //handle result and error + if( ret == null) + { + LOG.error("setStoragePolicy Response ret null "); + } + else if (ret.getStatus() == false) + { + LOG.error("setStoragePolicy Response ret false: " + ret.getException()); + throw new IOException(ret.getException()); + } + if(retryCount == RETRY_ATTEMPTS) + { + throw new IOException("coprocessor not response"); + } + if (retry) + retrySleep = retry(retrySleep); + } while (retry && retryCount++ < RETRY_ATTEMPTS); } catch (Exception e) { - throw new IOException(e); + throw new IOException(e); } - - } + } }