This is an automated email from the ASF dual-hosted git repository. szita pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push: new 26be5e8 HIVE-21375: Closing TransactionBatch closes FileSystem for other batches in Hive streaming v1 (Adam Szita, reviewed by Peter Vary) 26be5e8 is described below commit 26be5e8bd21d066fb9e90c8936c4b79a058ec0ec Author: Adam Szita <40628386+sz...@users.noreply.github.com> AuthorDate: Fri Oct 2 10:19:42 2020 +0200 HIVE-21375: Closing TransactionBatch closes FileSystem for other batches in Hive streaming v1 (Adam Szita, reviewed by Peter Vary) --- .../hive/hcatalog/streaming/HiveEndPoint.java | 58 ++++++++++++++++++---- .../hive/hcatalog/streaming/TestStreaming.java | 50 +++++++++++++++++-- 2 files changed, 94 insertions(+), 14 deletions(-) diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 3604630..6644b11 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -60,8 +60,11 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Information about the hive end point (i.e. table or partition) to write to. @@ -75,6 +78,10 @@ public class HiveEndPoint { public final String database; public final String table; public final ArrayList<String> partitionVals; + // Tracks connection count for UGIs, so that no premature FileSystem.closeAllForUGI can occur, and we avoid causing + // ClosedChannelException in a multithreaded (writing multiple partitions at a time) setup. + private final static HashMap<UserGroupInformation, Long> ugiConnectionRefCount = new HashMap<>(); + private final static Lock refCountLock = new ReentrantLock(); static final private Logger LOG = LoggerFactory.getLogger(HiveEndPoint.class.getName()); @@ -332,6 +339,9 @@ public class HiveEndPoint { if (createPart && !endPoint.partitionVals.isEmpty()) { createPartitionIfNotExists(endPoint, msClient, conf); } + if (this.ugi != null) { + incRefForUgi(ugi); + } } /** @@ -394,10 +404,12 @@ public class HiveEndPoint { return null; } }); - try { - FileSystem.closeAllForUGI(ugi); - } catch (IOException exception) { - LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception); + if (decRefForUgi(ugi) == 0) { + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception); + } } } catch (IOException e) { LOG.error("Error closing connection to " + endPt, e); @@ -1043,11 +1055,6 @@ public class HiveEndPoint { } } ); - try { - FileSystem.closeAllForUGI(ugi); - } catch (IOException exception) { - LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception); - } } catch (IOException e) { throw new ImpersonationFailed("Failed closing Txn Batch as user '" + username + "' on endPoint :" + endPt, e); @@ -1107,4 +1114,37 @@ public class HiveEndPoint { conf.setBoolVar(var, value); } + private static void incRefForUgi(UserGroupInformation ugi) { + refCountLock.lock(); + try { + Long prevCount = ugiConnectionRefCount.putIfAbsent(ugi, 1L); + if (prevCount != null) { + ugiConnectionRefCount.put(ugi, prevCount + 1L); + } + } finally { + refCountLock.unlock(); + } + } + + private static long decRefForUgi(UserGroupInformation ugi) { + refCountLock.lock(); + try { + Long prevCount = ugiConnectionRefCount.get(ugi); + if (prevCount == null) { + throw new IllegalStateException("Cannot decrement connection count on missing counter for UGI: " + ugi); + } + long newCount = prevCount - 1L; + if (newCount > 0) { + ugiConnectionRefCount.put(ugi, newCount); + } else if (newCount == 0) { + ugiConnectionRefCount.remove(ugi); + } else { + throw new IllegalStateException("Negative connection count for UGI: " + ugi); + } + return newCount; + } finally { + refCountLock.unlock(); + } + } + } // class HiveEndPoint diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 137323c..a953872 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -27,6 +27,8 @@ import java.io.PrintStream; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -98,6 +100,9 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.orc.impl.OrcAcidUtils; import org.apache.orc.tools.FileDump; import org.apache.thrift.TException; + +import com.google.common.collect.Lists; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -877,11 +882,15 @@ public class TestStreaming { testTransactionBatchCommit_Delimited(Utils.getUGI()); } private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) throws Exception { - HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, - partitionVals); + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals); StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection); + // 2nd (parallel) connection opened for a different partition (likely case with Storm) + HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName, tblName, + Lists.newArrayList("Europe", "Hungary")); + StreamingConnection connection2 = endPt2.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName()); + // 1st Txn TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); @@ -895,6 +904,16 @@ public class TestStreaming { Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); + // 2nd connection close - test here is to make sure closing this does not affect the other connection - i.e. FS + // should not be closed yet + if (ugi != null) { + FileSystem fs = getFs(ugi, conf); + connection2.close(); + Assert.assertFalse("FS should not be closed yet", isFsClosedForUgi(fs, ugi)); + } else { + connection2.close(); + } + // 2nd Txn txnBatch.beginNextTransaction(); Assert.assertEquals(TransactionBatch.TxnState.OPEN @@ -913,9 +932,14 @@ public class TestStreaming { Assert.assertEquals(TransactionBatch.TxnState.INACTIVE , txnBatch.getCurrentTransactionState()); - - connection.close(); - + // Last connection for this UGI in the 'process', should run FS.closeAllForUGI + if (ugi != null) { + FileSystem fs = getFs(ugi, conf); + connection.close(); + Assert.assertTrue("FS should be closed", isFsClosedForUgi(fs, ugi)); + } else { + connection.close(); + } // To Unpartitioned table endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); @@ -935,6 +959,22 @@ public class TestStreaming { connection.close(); } + private static FileSystem getFs(UserGroupInformation ugi, Configuration conf) throws Exception { + return ugi.doAs( + new PrivilegedExceptionAction<FileSystem>() { + @Override + public FileSystem run() throws Exception { + return FileSystem.get(conf); + } + }); + } + + private static boolean isFsClosedForUgi(FileSystem fs, UserGroupInformation ugi) throws Exception { + FileSystem newFS = getFs(ugi, fs.getConf()); + // With FS cache being turned on, if we got a new FS, it means the older fs instance was closed + return newFS != fs; + } + @Test public void testTransactionBatchCommit_Regex() throws Exception { testTransactionBatchCommit_Regex(null);