HIVE-17455: External LLAP client: connection to HS2 should be kept open until 
explicitly closed (Jason Dere, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/849fa02c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/849fa02c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/849fa02c

Branch: refs/heads/hive-14535
Commit: 849fa02c97e4eba2c443db871c1e6b117750d868
Parents: aa2557b
Author: Jason Dere <jd...@hortonworks.com>
Authored: Wed Sep 6 15:00:00 2017 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Wed Sep 6 15:00:00 2017 -0700

----------------------------------------------------------------------
 .../apache/hive/jdbc/TestJdbcWithMiniLlap.java  |   5 +
 .../hadoop/hive/llap/LlapBaseInputFormat.java   | 106 +++++++++++++++++--
 2 files changed, 100 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/849fa02c/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
index b5f1f13..68d2ddc 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
@@ -143,6 +144,7 @@ public class TestJdbcWithMiniLlap {
 
   @After
   public void tearDown() throws Exception {
+    LlapBaseInputFormat.closeAll();
     hs2Conn.close();
   }
 
@@ -475,6 +477,7 @@ public class TestJdbcWithMiniLlap {
     String url = miniHS2.getJdbcURL();
     String user = System.getProperty("user.name");
     String pwd = user;
+    String handleId = UUID.randomUUID().toString();
 
     LlapRowInputFormat inputFormat = new LlapRowInputFormat();
 
@@ -484,6 +487,7 @@ public class TestJdbcWithMiniLlap {
     job.set(LlapBaseInputFormat.USER_KEY, user);
     job.set(LlapBaseInputFormat.PWD_KEY, pwd);
     job.set(LlapBaseInputFormat.QUERY_KEY, query);
+    job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
 
     InputSplit[] splits = inputFormat.getSplits(job, numSplits);
     assertTrue(splits.length > 0);
@@ -503,6 +507,7 @@ public class TestJdbcWithMiniLlap {
       }
       reader.close();
     }
+    LlapBaseInputFormat.close(handleId);
 
     return rowCount;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/849fa02c/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index f2d9074..215f5b1 100644
--- 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -31,8 +31,11 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.collections4.ListUtils;
@@ -70,6 +73,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hive.common.util.ShutdownHookManager;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -92,6 +96,8 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
   private static final Logger LOG = 
LoggerFactory.getLogger(LlapBaseInputFormat.class);
 
   private static String driverName = "org.apache.hive.jdbc.HiveDriver";
+  private static final Map<String, Connection> connectionMap = new 
ConcurrentHashMap<String, Connection>();
+
   private String url;  // "jdbc:hive2://localhost:10000/default"
   private String user; // "hive",
   private String pwd;  // ""
@@ -102,6 +108,7 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
   public static final String QUERY_KEY = "llap.if.query";
   public static final String USER_KEY = "llap.if.user";
   public static final String PWD_KEY = "llap.if.pwd";
+  public static final String HANDLE_ID = "llap.if.handleid";
 
   public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
   public static final LlapServiceInstance[] serviceInstanceArray = new 
LlapServiceInstance[0];
@@ -191,6 +198,10 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     return recordReader;
   }
 
+  /**
+   * Calling getSplits() will open a HiveServer2 connection which should be 
closed by the calling application
+   * using LlapBaseInputFormat.close() when the application is done with the 
splits.
+   */
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
     List<InputSplit> ins = new ArrayList<InputSplit>();
@@ -204,25 +215,54 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
       throw new IllegalStateException();
     }
 
+    String handleId = job.get(HANDLE_ID);
+    if (handleId == null) {
+      handleId = UUID.randomUUID().toString();
+      LOG.info("Handle ID not specified - generated handle ID {}", handleId);
+    }
+    // Check if the handle has already been used in the registry
+    if (connectionMap.containsKey(handleId)) {
+      throw new IllegalStateException("Handle ID " + handleId + " has already 
been used. This must be unique.");
+    }
+
     try {
       Class.forName(driverName);
     } catch (ClassNotFoundException e) {
       throw new IOException(e);
     }
 
+    LOG.info("Handle ID {}: query={}", handleId, query);
     String escapedQuery = StringUtils.escapeString(query, ESCAPE_CHAR, 
escapedChars);
     String sql = String.format(SPLIT_QUERY, escapedQuery, numSplits);
-    try (
-      Connection con = DriverManager.getConnection(url,user,pwd);
-      Statement stmt = con.createStatement();
-      ResultSet res = stmt.executeQuery(sql);
-    ) {
-      while (res.next()) {
-        // deserialize split
-        DataInput in = new DataInputStream(res.getBinaryStream(1));
-        InputSplitWithLocationInfo is = new LlapInputSplit();
-        is.readFields(in);
-        ins.add(is);
+    try {
+      Connection conn = DriverManager.getConnection(url,user,pwd);
+      try (
+        Statement stmt = conn.createStatement();
+      ) {
+        ResultSet res = stmt.executeQuery(sql);
+        while (res.next()) {
+          // deserialize split
+          DataInput in = new DataInputStream(res.getBinaryStream(1));
+          InputSplitWithLocationInfo is = new LlapInputSplit();
+          is.readFields(in);
+          ins.add(is);
+        }
+        res.close();
+      } catch (Exception e) {
+        LOG.error("Closing connection due to error", e);
+        conn.close();
+        throw e;
+      }
+
+      // Keep connection open to hang on to associated resources (temp tables, 
locks).
+      // Save to connectionMap so it can be closed at user's convenience.
+      Connection putResult = connectionMap.putIfAbsent(handleId, conn);
+      // Hopefully no one has used the same handle ID during the time that we 
executed the statement.
+      if (putResult != null) {
+        String msg = "Handle ID " + handleId + " has already been used. This 
must be unique.";
+        LOG.error(msg);
+        conn.close();
+        throw new IllegalStateException(msg);
       }
     } catch (Exception e) {
       throw new IOException(e);
@@ -230,6 +270,50 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     return ins.toArray(new InputSplit[ins.size()]);
   }
 
+  /**
+   * Close the connection associated with the handle ID, if getSplits() was 
configured with a handle ID.
+   * Call when the application is done using the splits generated by 
getSplits().
+   * @param handleId Handle ID used in configuration for getSplits()
+   * @throws IOException
+   */
+  public static void close(String handleId) throws IOException {
+    Connection conn = connectionMap.remove(handleId);
+    if (conn != null) {
+      try {
+        LOG.debug("Closing connection for handle ID {}", handleId);
+        conn.close();
+      } catch (Exception err) {
+        throw new IOException(err);
+      }
+    } else {
+      LOG.info("No connection found for handle ID {}", handleId);
+    }
+  }
+
+  /**
+   * Close all outstanding connections created by getSplits() calls
+   */
+  public static void closeAll() {
+    LOG.debug("Closing all handles");
+    for (String handleId : connectionMap.keySet()) {
+      try {
+        close(handleId);
+      } catch (Exception err) {
+        LOG.error("Error closing handle ID " + handleId, err);
+      }
+    }
+  }
+
+  static {
+    // Shutdown hook to clean up resources at process end.
+    ShutdownHookManager.addShutdownHook(new Runnable() {
+      @Override
+      public void run() {
+        closeAll();
+      }
+    });
+  }
+
   private LlapServiceInstance getServiceInstance(JobConf job, LlapInputSplit 
llapSplit) throws IOException {
     LlapRegistryService registryService = LlapRegistryService.getClient(job);
     String host = llapSplit.getLocations()[0];

Reply via email to