HIVE-17455: External LLAP client: connection to HS2 should be kept open until explicitly closed (Jason Dere, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/849fa02c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/849fa02c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/849fa02c Branch: refs/heads/hive-14535 Commit: 849fa02c97e4eba2c443db871c1e6b117750d868 Parents: aa2557b Author: Jason Dere <jd...@hortonworks.com> Authored: Wed Sep 6 15:00:00 2017 -0700 Committer: Jason Dere <jd...@hortonworks.com> Committed: Wed Sep 6 15:00:00 2017 -0700 ---------------------------------------------------------------------- .../apache/hive/jdbc/TestJdbcWithMiniLlap.java | 5 + .../hadoop/hive/llap/LlapBaseInputFormat.java | 106 +++++++++++++++++-- 2 files changed, 100 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/849fa02c/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java index b5f1f13..68d2ddc 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java @@ -44,6 +44,7 @@ import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -143,6 +144,7 @@ public class TestJdbcWithMiniLlap { @After public void tearDown() throws Exception { + LlapBaseInputFormat.closeAll(); hs2Conn.close(); } @@ -475,6 +477,7 @@ public class TestJdbcWithMiniLlap { String url = miniHS2.getJdbcURL(); String user = System.getProperty("user.name"); String pwd = user; + String handleId = UUID.randomUUID().toString(); LlapRowInputFormat inputFormat = new LlapRowInputFormat(); @@ -484,6 +487,7 @@ public class TestJdbcWithMiniLlap { job.set(LlapBaseInputFormat.USER_KEY, user); job.set(LlapBaseInputFormat.PWD_KEY, pwd); job.set(LlapBaseInputFormat.QUERY_KEY, query); + job.set(LlapBaseInputFormat.HANDLE_ID, handleId); InputSplit[] splits = inputFormat.getSplits(job, numSplits); assertTrue(splits.length > 0); @@ -503,6 +507,7 @@ public class TestJdbcWithMiniLlap { } reader.close(); } + LlapBaseInputFormat.close(handleId); return rowCount; } http://git-wip-us.apache.org/repos/asf/hive/blob/849fa02c/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index f2d9074..215f5b1 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -31,8 +31,11 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.collections4.ListUtils; @@ -70,6 +73,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hive.common.util.ShutdownHookManager; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -92,6 +96,8 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> private static final Logger LOG = LoggerFactory.getLogger(LlapBaseInputFormat.class); private static String driverName = "org.apache.hive.jdbc.HiveDriver"; + private static final Map<String, Connection> connectionMap = new ConcurrentHashMap<String, Connection>(); + private String url; // "jdbc:hive2://localhost:10000/default" private String user; // "hive", private String pwd; // "" @@ -102,6 +108,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> public static final String QUERY_KEY = "llap.if.query"; public static final String USER_KEY = "llap.if.user"; public static final String PWD_KEY = "llap.if.pwd"; + public static final String HANDLE_ID = "llap.if.handleid"; public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)"; public static final LlapServiceInstance[] serviceInstanceArray = new LlapServiceInstance[0]; @@ -191,6 +198,10 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> return recordReader; } + /** + * Calling getSplits() will open a HiveServer2 connection which should be closed by the calling application + * using LlapBaseInputFormat.close() when the application is done with the splits. + */ @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { List<InputSplit> ins = new ArrayList<InputSplit>(); @@ -204,25 +215,54 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> throw new IllegalStateException(); } + String handleId = job.get(HANDLE_ID); + if (handleId == null) { + handleId = UUID.randomUUID().toString(); + LOG.info("Handle ID not specified - generated handle ID {}", handleId); + } + // Check if the handle has already been used in the registry + if (connectionMap.containsKey(handleId)) { + throw new IllegalStateException("Handle ID " + handleId + " has already been used. This must be unique."); + } + try { Class.forName(driverName); } catch (ClassNotFoundException e) { throw new IOException(e); } + LOG.info("Handle ID {}: query={}", handleId, query); String escapedQuery = StringUtils.escapeString(query, ESCAPE_CHAR, escapedChars); String sql = String.format(SPLIT_QUERY, escapedQuery, numSplits); - try ( - Connection con = DriverManager.getConnection(url,user,pwd); - Statement stmt = con.createStatement(); - ResultSet res = stmt.executeQuery(sql); - ) { - while (res.next()) { - // deserialize split - DataInput in = new DataInputStream(res.getBinaryStream(1)); - InputSplitWithLocationInfo is = new LlapInputSplit(); - is.readFields(in); - ins.add(is); + try { + Connection conn = DriverManager.getConnection(url,user,pwd); + try ( + Statement stmt = conn.createStatement(); + ) { + ResultSet res = stmt.executeQuery(sql); + while (res.next()) { + // deserialize split + DataInput in = new DataInputStream(res.getBinaryStream(1)); + InputSplitWithLocationInfo is = new LlapInputSplit(); + is.readFields(in); + ins.add(is); + } + res.close(); + } catch (Exception e) { + LOG.error("Closing connection due to error", e); + conn.close(); + throw e; + } + + // Keep connection open to hang on to associated resources (temp tables, locks). + // Save to connectionMap so it can be closed at user's convenience. + Connection putResult = connectionMap.putIfAbsent(handleId, conn); + // Hopefully no one has used the same handle ID during the time that we executed the statement. + if (putResult != null) { + String msg = "Handle ID " + handleId + " has already been used. This must be unique."; + LOG.error(msg); + conn.close(); + throw new IllegalStateException(msg); } } catch (Exception e) { throw new IOException(e); @@ -230,6 +270,50 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> return ins.toArray(new InputSplit[ins.size()]); } + /** + * Close the connection associated with the handle ID, if getSplits() was configured with a handle ID. + * Call when the application is done using the splits generated by getSplits(). + * @param handleId Handle ID used in configuration for getSplits() + * @throws IOException + */ + public static void close(String handleId) throws IOException { + Connection conn = connectionMap.remove(handleId); + if (conn != null) { + try { + LOG.debug("Closing connection for handle ID {}", handleId); + conn.close(); + } catch (Exception err) { + throw new IOException(err); + } + } else { + LOG.info("No connection found for handle ID {}", handleId); + } + } + + /** + * Close all outstanding connections created by getSplits() calls + */ + public static void closeAll() { + LOG.debug("Closing all handles"); + for (String handleId : connectionMap.keySet()) { + try { + close(handleId); + } catch (Exception err) { + LOG.error("Error closing handle ID " + handleId, err); + } + } + } + + static { + // Shutdown hook to clean up resources at process end. + ShutdownHookManager.addShutdownHook(new Runnable() { + @Override + public void run() { + closeAll(); + } + }); + } + private LlapServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException { LlapRegistryService registryService = LlapRegistryService.getClient(job); String host = llapSplit.getLocations()[0];