Repository: hbase
Updated Branches:
  refs/heads/branch-1 ff3e56293 -> a817f196a


HBASE-15291 FileSystem not closed in secure bulkLoad

Signed-off-by: Ashish Singhi <ashishsin...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a817f196
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a817f196
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a817f196

Branch: refs/heads/branch-1
Commit: a817f196a19fdbe94d302e5f0e0e652457bc746d
Parents: ff3e562
Author: Ashish Singhi <ashishsin...@apache.org>
Authored: Wed Apr 11 12:59:52 2018 +0530
Committer: Ashish Singhi <ashishsin...@apache.org>
Committed: Wed Apr 11 12:59:52 2018 +0530

----------------------------------------------------------------------
 .../security/access/SecureBulkLoadEndpoint.java | 67 +++++++++++++-------
 1 file changed, 45 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a817f196/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
index 37d66e5..68f31cc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -236,6 +236,15 @@ public class SecureBulkLoadEndpoint extends 
SecureBulkLoadService
       done.run(CleanupBulkLoadResponse.newBuilder().build());
     } catch (IOException e) {
       ResponseConverter.setControllerException(controller, e);
+    } finally {
+      UserGroupInformation ugi = getActiveUser().getUGI();
+      try {
+        if (!UserGroupInformation.getLoginUser().equals(ugi)) {
+          FileSystem.closeAllForUGI(ugi);
+        }
+      } catch (IOException e) {
+        LOG.error("Failed to close FileSystem for: " + ugi, e);
+      }
     }
     done.run(null);
   }
@@ -425,7 +434,7 @@ public class SecureBulkLoadEndpoint extends 
SecureBulkLoadService
       }
 
       if (srcFs == null) {
-        srcFs = FileSystem.get(p.toUri(), conf);
+        srcFs = FileSystem.newInstance(p.toUri(), conf);
       }
 
       if(!isFile(p)) {
@@ -452,34 +461,48 @@ public class SecureBulkLoadEndpoint extends 
SecureBulkLoadService
     @Override
     public void doneBulkLoad(byte[] family, String srcPath) throws IOException 
{
       LOG.debug("Bulk Load done for: " + srcPath);
+      closeSrcFs();
+    }
+
+    private void closeSrcFs() throws IOException {
+      if (srcFs != null) {
+        srcFs.close();
+        srcFs = null;
+      }
     }
 
     @Override
     public void failedBulkLoad(final byte[] family, final String srcPath) 
throws IOException {
-      if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
-        // files are copied so no need to move them back
-        return;
-      }
-      Path p = new Path(srcPath);
-      Path stageP = new Path(stagingDir,
-          new Path(Bytes.toString(family), p.getName()));
+      try {
+        Path p = new Path(srcPath);
+        if (srcFs == null) {
+          srcFs = FileSystem.newInstance(p.toUri(), conf);
+        }
+        if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
+          // files are copied so no need to move them back
+          return;
+        }
+        Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), 
p.getName()));
 
-      // In case of Replication for bulk load files, hfiles are not renamed by 
end point during
-      // prepare stage, so no need of rename here again
-      if (p.equals(stageP)) {
-        LOG.debug(p.getName() + " is already available in source directory. 
Skipping rename.");
-        return;
-      }
+        // In case of Replication for bulk load files, hfiles are not renamed 
by end point during
+        // prepare stage, so no need of rename here again
+        if (p.equals(stageP)) {
+          LOG.debug(p.getName() + " is already available in source directory. 
Skipping rename.");
+          return;
+        }
 
-      LOG.debug("Moving " + stageP + " back to " + p);
-      if(!fs.rename(stageP, p))
-        throw new IOException("Failed to move HFile: " + stageP + " to " + p);
+        LOG.debug("Moving " + stageP + " back to " + p);
+        if (!fs.rename(stageP, p))
+          throw new IOException("Failed to move HFile: " + stageP + " to " + 
p);
 
-      // restore original permission
-      if (origPermissions.containsKey(srcPath)) {
-        fs.setPermission(p, origPermissions.get(srcPath));
-      } else {
-        LOG.warn("Can't find previous permission for path=" + srcPath);
+        // restore original permission
+        if (origPermissions.containsKey(srcPath)) {
+          fs.setPermission(p, origPermissions.get(srcPath));
+        } else {
+          LOG.warn("Can't find previous permission for path=" + srcPath);
+        }
+      } finally {
+        closeSrcFs();
       }
     }
 

Reply via email to