HIVE-10835: Concurrency issues in JDBC driver (Chaoyu Tang reviewed by Vaibhav 
Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/82e79772
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/82e79772
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/82e79772

Branch: refs/heads/llap
Commit: 82e797728c78f5fbeeb66a7d94b21296b37ebb40
Parents: 4bffffb
Author: Vaibhav Gumashta <[email protected]>
Authored: Sat May 30 13:38:34 2015 -0700
Committer: Vaibhav Gumashta <[email protected]>
Committed: Sat May 30 13:38:34 2015 -0700

----------------------------------------------------------------------
 .../apache/hive/jdbc/TestJdbcWithMiniHS2.java   | 123 +++++++++++++++++++
 .../org/apache/hive/jdbc/HiveConnection.java    |  45 ++++++-
 .../apache/hive/jdbc/HiveQueryResultSet.java    |  25 +---
 .../org/apache/hive/jdbc/HiveStatement.java     |  24 +---
 4 files changed, 171 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/82e79772/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index 7210480..306e3fe 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -29,9 +29,21 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -110,6 +122,117 @@ public class TestJdbcWithMiniHS2 {
     stmt.close();
   }
 
+  @Test
+  public void testConcurrentStatements() throws Exception {
+    String tableName = "testConcurrentStatements";
+    Statement stmt = hs2Conn.createStatement();
+
+    // create table
+    stmt.execute("DROP TABLE IF EXISTS " + tableName);
+    stmt.execute("CREATE TABLE " + tableName
+        + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' 
test table'");
+
+    // load data
+    stmt.execute("load data local inpath '"
+        + dataFilePath.toString() + "' into table " + tableName);
+
+    ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName);
+    assertTrue(res.next());
+    res.close();
+    stmt.close();
+
+    // Start concurrent testing
+    int POOL_SIZE = 100;
+    int TASK_COUNT = 300;
+
+    SynchronousQueue<Runnable> executorQueue = new 
SynchronousQueue<Runnable>();
+    ExecutorService workers = new ThreadPoolExecutor(1, POOL_SIZE, 20, 
TimeUnit.SECONDS, executorQueue);
+    List<Future<Boolean>> list = new ArrayList<Future<Boolean>>();
+    int i = 0;
+    while(i < TASK_COUNT) {
+      try {
+        Future<Boolean> future = workers.submit(new JDBCTask(hs2Conn, i, 
tableName));
+        list.add(future);
+        i++;
+      } catch (RejectedExecutionException ree) {
+        try {
+          TimeUnit.MILLISECONDS.sleep(100);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
+    for (Future<Boolean> future : list) {
+      try {
+        Boolean result = future.get(30, TimeUnit.SECONDS);
+        assertTrue(result);
+      } catch (ExecutionException ee) {
+        fail("Concurrent Statement failed: " + ee.getCause());
+      } catch (TimeoutException te) {
+        System.out.println("Task was timeout after 30 second: " + te);
+      } catch (CancellationException ce) {
+        System.out.println("Task was interrupted: " + ce);
+      } catch (InterruptedException ie) {
+        System.out.println("Thread was interrupted: " + ie);
+      }
+    }
+    workers.shutdown();
+  }
+
+  static class JDBCTask implements Callable<Boolean> {
+    private String showsql = "show tables";
+    private String querysql;
+    private int seq = 0;
+    Connection con = null;
+    Statement stmt = null;
+    ResultSet res = null;
+
+    JDBCTask(Connection con, int seq, String tblName) {
+      this.con = con;
+      this.seq = seq;
+      querysql = "SELECT count(value) FROM " + tblName;
+    }
+
+    public Boolean call() throws SQLException {
+      int mod = seq%10;
+      try {
+        if (mod < 2) {
+          String name = con.getMetaData().getDatabaseProductName();
+        } else if (mod < 5) {
+          stmt = con.createStatement();
+          res = stmt.executeQuery(querysql);
+          while (res.next()) {
+            res.getInt(1);
+          }
+        } else if (mod < 7) {
+          res = con.getMetaData().getSchemas();
+          if (res.next()) {
+            res.getString(1);
+          }
+        } else {
+          stmt = con.createStatement();
+          res = stmt.executeQuery(showsql);
+          if (res.next()) {
+            res.getString(1);
+          }
+        }
+        return new Boolean(true);
+      } finally {
+        try {
+          if (res != null) {
+            res.close();
+            res = null;
+          }
+          if (stmt != null) {
+            stmt.close();
+            stmt = null;
+          }
+        } catch (SQLException sqle1) {
+        }
+      }
+    }
+  }
 
   /**   This test is to connect to any database without using the command "Use 
<<DB>>"
    *  1)connect to default database.

http://git-wip-us.apache.org/repos/asf/hive/blob/82e79772/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java 
b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
index 306a1cd..277f6d4 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
@@ -20,6 +20,10 @@ package org.apache.hive.jdbc;
 
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.security.KeyStore;
 import java.security.SecureRandom;
 import java.sql.Array;
@@ -176,7 +180,6 @@ public class HiveConnection implements java.sql.Connection {
       // set up the client
       client = new TCLIService.Client(new TBinaryProtocol(transport));
     }
-
     // add supported protocols
     supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1);
     supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2);
@@ -189,6 +192,9 @@ public class HiveConnection implements java.sql.Connection {
 
     // open client session
     openSession();
+
+    // Wrap the client with a thread-safe proxy to serialize the RPC calls
+    client = newSynchronizedClient(client);
   }
 
   private void openTransport() throws SQLException {
@@ -1357,4 +1363,41 @@ public class HiveConnection implements 
java.sql.Connection {
   public TProtocolVersion getProtocol() {
     return protocol;
   }
+
+  public static TCLIService.Iface newSynchronizedClient(
+      TCLIService.Iface client) {
+    return (TCLIService.Iface) Proxy.newProxyInstance(
+        HiveConnection.class.getClassLoader(),
+      new Class [] { TCLIService.Iface.class },
+      new SynchronizedHandler(client));
+  }
+
+  private static class SynchronizedHandler implements InvocationHandler {
+    private final TCLIService.Iface client;
+
+    SynchronizedHandler(TCLIService.Iface client) {
+      this.client = client;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object [] args)
+        throws Throwable {
+      try {
+        synchronized (client) {
+          return method.invoke(client, args);
+        }
+      } catch (InvocationTargetException e) {
+        // all IFace APIs throw TException
+        if (e.getTargetException() instanceof TException) {
+          throw (TException)e.getTargetException();
+        } else {
+          // should not happen
+          throw new TException("Error in calling method " + method.getName(),
+              e.getTargetException());
+        }
+      } catch (Exception e) {
+        throw new TException("Error in calling method " + method.getName(), e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/82e79772/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java 
b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
index e93795a..f6860f0 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java
@@ -78,8 +78,6 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
   private boolean fetchFirst = false;
 
   private final TProtocolVersion protocol;
-  private ReentrantLock transportLock;
-
 
   public static class Builder {
 
@@ -191,7 +189,6 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
     this.stmtHandle = builder.stmtHandle;
     this.sessHandle = builder.sessHandle;
     this.fetchSize = builder.fetchSize;
-    this.transportLock = builder.transportLock;
     columnNames = new ArrayList<String>();
     normalizedColumnNames = new ArrayList<String>();
     columnTypes = new ArrayList<String>();
@@ -252,16 +249,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
       TGetResultSetMetadataReq metadataReq = new 
TGetResultSetMetadataReq(stmtHandle);
       // TODO need session handle
       TGetResultSetMetadataResp  metadataResp;
-      if (transportLock == null) {
-        metadataResp = client.GetResultSetMetadata(metadataReq);
-      } else {
-        transportLock.lock();
-        try {
-          metadataResp = client.GetResultSetMetadata(metadataReq);
-        } finally {
-          transportLock.unlock();
-        }
-      }
+      metadataResp = client.GetResultSetMetadata(metadataReq);
       Utils.verifySuccess(metadataResp.getStatus());
 
       StringBuilder namesSb = new StringBuilder();
@@ -372,16 +360,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet {
         TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle,
             orientation, fetchSize);
         TFetchResultsResp fetchResp;
-        if (transportLock == null) {
-          fetchResp = client.FetchResults(fetchReq);
-        } else {
-          transportLock.lock();
-          try {
-            fetchResp = client.FetchResults(fetchReq);
-          } finally {
-            transportLock.unlock();
-          }
-        }
+        fetchResp = client.FetchResults(fetchReq);
         Utils.verifySuccessWithInfo(fetchResp.getStatus());
 
         TRowSet results = fetchResp.getResults();

http://git-wip-us.apache.org/repos/asf/hive/blob/82e79772/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java 
b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index 6b3d05c..170fc53 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -108,9 +108,6 @@ public class HiveStatement implements java.sql.Statement {
    */
   private boolean isExecuteStatementFailed = false;
 
-  // A fair reentrant lock
-  private ReentrantLock transportLock = new ReentrantLock(true);
-
   public HiveStatement(HiveConnection connection, TCLIService.Iface client,
       TSessionHandle sessHandle) {
     this(connection, client, sessHandle, false);
@@ -148,7 +145,6 @@ public class HiveStatement implements java.sql.Statement {
       return;
     }
 
-    transportLock.lock();
     try {
       if (stmtHandle != null) {
         TCancelOperationReq cancelReq = new TCancelOperationReq(stmtHandle);
@@ -159,8 +155,6 @@ public class HiveStatement implements java.sql.Statement {
       throw e;
     } catch (Exception e) {
       throw new SQLException(e.toString(), "08S01", e);
-    } finally {
-      transportLock.unlock();
     }
     isCancelled = true;
   }
@@ -188,7 +182,6 @@ public class HiveStatement implements java.sql.Statement {
   }
 
   void closeClientOperation() throws SQLException {
-    transportLock.lock();
     try {
       if (stmtHandle != null) {
         TCloseOperationReq closeReq = new TCloseOperationReq(stmtHandle);
@@ -199,8 +192,6 @@ public class HiveStatement implements java.sql.Statement {
       throw e;
     } catch (Exception e) {
       throw new SQLException(e.toString(), "08S01", e);
-    } finally {
-      transportLock.unlock();
     }
     isQueryClosed = true;
     isExecuteStatementFailed = false;
@@ -251,7 +242,6 @@ public class HiveStatement implements java.sql.Statement {
     execReq.setRunAsync(true);
     execReq.setConfOverlay(sessConf);
 
-    transportLock.lock();
     try {
       TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
       Utils.verifySuccessWithInfo(execResp.getStatus());
@@ -263,8 +253,6 @@ public class HiveStatement implements java.sql.Statement {
     } catch (Exception ex) {
       isExecuteStatementFailed = true;
       throw new SQLException(ex.toString(), "08S01", ex);
-    } finally {
-      transportLock.unlock();
     }
 
     TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle);
@@ -278,12 +266,7 @@ public class HiveStatement implements java.sql.Statement {
          * For an async SQLOperation, GetOperationStatus will use the long 
polling approach
          * It will essentially return after the 
HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
          */
-        transportLock.lock();
-        try {
-          statusResp = client.GetOperationStatus(statusReq);
-        } finally {
-          transportLock.unlock();
-        }
+        statusResp = client.GetOperationStatus(statusReq);
         Utils.verifySuccessWithInfo(statusResp.getStatus());
         if (statusResp.isSetOperationState()) {
           switch (statusResp.getOperationState()) {
@@ -322,7 +305,7 @@ public class HiveStatement implements java.sql.Statement {
     }
     resultSet =  new 
HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle)
         .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize)
-        .setScrollable(isScrollableResultset).setTransportLock(transportLock)
+        .setScrollable(isScrollableResultset)
         .build();
     return true;
   }
@@ -813,7 +796,6 @@ public class HiveStatement implements java.sql.Statement {
 
     List<String> logs = new ArrayList<String>();
     TFetchResultsResp tFetchResultsResp = null;
-    transportLock.lock();
     try {
       if (stmtHandle != null) {
         TFetchResultsReq tFetchResultsReq = new TFetchResultsReq(stmtHandle,
@@ -837,8 +819,6 @@ public class HiveStatement implements java.sql.Statement {
       throw e;
     } catch (Exception e) {
       throw new SQLException("Error when getting query log: " + e, e);
-    } finally {
-      transportLock.unlock();
     }
 
     RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(),

Reply via email to