This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 26be5e8  HIVE-21375: Closing TransactionBatch closes FileSystem for 
other batches in Hive streaming v1 (Adam Szita, reviewed by Peter Vary)
26be5e8 is described below

commit 26be5e8bd21d066fb9e90c8936c4b79a058ec0ec
Author: Adam Szita <40628386+sz...@users.noreply.github.com>
AuthorDate: Fri Oct 2 10:19:42 2020 +0200

    HIVE-21375: Closing TransactionBatch closes FileSystem for other batches in 
Hive streaming v1 (Adam Szita, reviewed by Peter Vary)
---
 .../hive/hcatalog/streaming/HiveEndPoint.java      | 58 ++++++++++++++++++----
 .../hive/hcatalog/streaming/TestStreaming.java     | 50 +++++++++++++++++--
 2 files changed, 94 insertions(+), 14 deletions(-)

diff --git 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 3604630..6644b11 100644
--- 
a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ 
b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -60,8 +60,11 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Information about the hive end point (i.e. table or partition) to write to.
@@ -75,6 +78,10 @@ public class HiveEndPoint {
   public final String database;
   public final String table;
   public final ArrayList<String> partitionVals;
+  // Tracks connection count for UGIs, so that no premature 
FileSystem.closeAllForUGI can occur, and we avoid causing
+  // ClosedChannelException in a multithreaded (writing multiple partitions at 
a time) setup.
+  private final static HashMap<UserGroupInformation, Long> 
ugiConnectionRefCount = new HashMap<>();
+  private final static Lock refCountLock = new ReentrantLock();
 
 
   static final private Logger LOG = 
LoggerFactory.getLogger(HiveEndPoint.class.getName());
@@ -332,6 +339,9 @@ public class HiveEndPoint {
       if (createPart && !endPoint.partitionVals.isEmpty()) {
         createPartitionIfNotExists(endPoint, msClient, conf);
       }
+      if (this.ugi != null) {
+        incRefForUgi(ugi);
+      }
     }
 
     /**
@@ -394,10 +404,12 @@ public class HiveEndPoint {
                 return null;
               }
             });
-        try {
-          FileSystem.closeAllForUGI(ugi);
-        } catch (IOException exception) {
-          LOG.error("Could not clean up file-system handles for UGI: " + ugi, 
exception);
+        if (decRefForUgi(ugi) == 0) {
+          try {
+              FileSystem.closeAllForUGI(ugi);
+          } catch (IOException exception) {
+            LOG.error("Could not clean up file-system handles for UGI: " + 
ugi, exception);
+          }
         }
       } catch (IOException e) {
         LOG.error("Error closing connection to " + endPt, e);
@@ -1043,11 +1055,6 @@ public class HiveEndPoint {
                   }
                 }
         );
-        try {
-          FileSystem.closeAllForUGI(ugi);
-        } catch (IOException exception) {
-          LOG.error("Could not clean up file-system handles for UGI: " + ugi, 
exception);
-        }
       } catch (IOException e) {
         throw new ImpersonationFailed("Failed closing Txn Batch as user '" + 
username +
                 "' on  endPoint :" + endPt, e);
@@ -1107,4 +1114,37 @@ public class HiveEndPoint {
     conf.setBoolVar(var, value);
   }
 
+  private static void incRefForUgi(UserGroupInformation ugi) {
+    refCountLock.lock();
+    try {
+      Long prevCount = ugiConnectionRefCount.putIfAbsent(ugi, 1L);
+      if (prevCount != null) {
+        ugiConnectionRefCount.put(ugi, prevCount + 1L);
+      }
+    } finally {
+      refCountLock.unlock();
+    }
+  }
+
+  private static long decRefForUgi(UserGroupInformation ugi) {
+    refCountLock.lock();
+    try {
+      Long prevCount = ugiConnectionRefCount.get(ugi);
+      if (prevCount == null) {
+        throw new IllegalStateException("Cannot decrement connection count on 
missing counter for UGI: " + ugi);
+      }
+      long newCount = prevCount - 1L;
+      if (newCount > 0) {
+        ugiConnectionRefCount.put(ugi, newCount);
+      } else if (newCount == 0) {
+        ugiConnectionRefCount.remove(ugi);
+      } else {
+        throw new IllegalStateException("Negative connection count for UGI: " 
+ ugi);
+      }
+      return newCount;
+    } finally {
+      refCountLock.unlock();
+    }
+  }
+
 }  // class HiveEndPoint
diff --git 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 137323c..a953872 100644
--- 
a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ 
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -27,6 +27,8 @@ import java.io.PrintStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -98,6 +100,9 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.orc.impl.OrcAcidUtils;
 import org.apache.orc.tools.FileDump;
 import org.apache.thrift.TException;
+
+import com.google.common.collect.Lists;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -877,11 +882,15 @@ public class TestStreaming {
     testTransactionBatchCommit_Delimited(Utils.getUGI());
   }
   private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) 
throws Exception {
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
-      partitionVals);
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, 
partitionVals);
     StreamingConnection connection = endPt.newConnection(true, conf, ugi, 
"UT_" + Thread.currentThread().getName());
     DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", 
endPt, conf, connection);
 
+    // 2nd (parallel) connection opened for a different partition (likely case 
with Storm)
+    HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName, tblName,
+        Lists.newArrayList("Europe", "Hungary"));
+    StreamingConnection connection2 = endPt2.newConnection(true, conf, ugi, 
"UT_" + Thread.currentThread().getName());
+
     // 1st Txn
     TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
     txnBatch.beginNextTransaction();
@@ -895,6 +904,16 @@ public class TestStreaming {
     Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
       , txnBatch.getCurrentTransactionState());
 
+    // 2nd connection close - test here is to make sure closing this does not 
affect the other connection - i.e. FS
+    // should not be closed yet
+    if (ugi != null) {
+      FileSystem fs = getFs(ugi, conf);
+      connection2.close();
+      Assert.assertFalse("FS should not be closed yet", isFsClosedForUgi(fs, 
ugi));
+    } else {
+      connection2.close();
+    }
+
     // 2nd Txn
     txnBatch.beginNextTransaction();
     Assert.assertEquals(TransactionBatch.TxnState.OPEN
@@ -913,9 +932,14 @@ public class TestStreaming {
     Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
       , txnBatch.getCurrentTransactionState());
 
-
-    connection.close();
-
+    // Last connection for this UGI in the 'process', should run 
FS.closeAllForUGI
+    if (ugi != null) {
+      FileSystem fs = getFs(ugi, conf);
+      connection.close();
+      Assert.assertTrue("FS should be closed", isFsClosedForUgi(fs, ugi));
+    } else {
+      connection.close();
+    }
 
     // To Unpartitioned table
     endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
@@ -935,6 +959,22 @@ public class TestStreaming {
     connection.close();
   }
 
+  private static FileSystem getFs(UserGroupInformation ugi, Configuration 
conf) throws Exception {
+    return ugi.doAs(
+        new PrivilegedExceptionAction<FileSystem>() {
+          @Override
+          public FileSystem run() throws Exception {
+            return FileSystem.get(conf);
+          }
+        });
+  }
+
+  private static boolean isFsClosedForUgi(FileSystem fs, UserGroupInformation 
ugi) throws Exception {
+    FileSystem newFS = getFs(ugi, fs.getConf());
+    // With FS cache being turned on, if we got a new FS, it means the older 
fs instance was closed
+    return newFS != fs;
+  }
+
   @Test
   public void testTransactionBatchCommit_Regex() throws Exception {
     testTransactionBatchCommit_Regex(null);

Reply via email to