HIVE-10835: Concurrency issues in JDBC driver (Chaoyu Tang reviewed by Vaibhav Gumashta)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/82e79772 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/82e79772 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/82e79772 Branch: refs/heads/llap Commit: 82e797728c78f5fbeeb66a7d94b21296b37ebb40 Parents: 4bffffb Author: Vaibhav Gumashta <[email protected]> Authored: Sat May 30 13:38:34 2015 -0700 Committer: Vaibhav Gumashta <[email protected]> Committed: Sat May 30 13:38:34 2015 -0700 ---------------------------------------------------------------------- .../apache/hive/jdbc/TestJdbcWithMiniHS2.java | 123 +++++++++++++++++++ .../org/apache/hive/jdbc/HiveConnection.java | 45 ++++++- .../apache/hive/jdbc/HiveQueryResultSet.java | 25 +--- .../org/apache/hive/jdbc/HiveStatement.java | 24 +--- 4 files changed, 171 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/82e79772/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 7210480..306e3fe 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -29,9 +29,21 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -110,6 +122,117 @@ public class TestJdbcWithMiniHS2 { stmt.close(); } + @Test + public void testConcurrentStatements() throws Exception { + String tableName = "testConcurrentStatements"; + Statement stmt = hs2Conn.createStatement(); + + // create table + stmt.execute("DROP TABLE IF EXISTS " + tableName); + stmt.execute("CREATE TABLE " + tableName + + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'"); + + // load data + stmt.execute("load data local inpath '" + + dataFilePath.toString() + "' into table " + tableName); + + ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); + assertTrue(res.next()); + res.close(); + stmt.close(); + + // Start concurrent testing + int POOL_SIZE = 100; + int TASK_COUNT = 300; + + SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<Runnable>(); + ExecutorService workers = new ThreadPoolExecutor(1, POOL_SIZE, 20, TimeUnit.SECONDS, executorQueue); + List<Future<Boolean>> list = new ArrayList<Future<Boolean>>(); + int i = 0; + while(i < TASK_COUNT) { + try { + Future<Boolean> future = workers.submit(new JDBCTask(hs2Conn, i, tableName)); + list.add(future); + i++; + } catch (RejectedExecutionException ree) { + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + for (Future<Boolean> future : list) { + try { + Boolean result = future.get(30, TimeUnit.SECONDS); + assertTrue(result); + } catch (ExecutionException ee) { + fail("Concurrent Statement failed: " + ee.getCause()); + } catch (TimeoutException te) { + System.out.println("Task was timeout after 30 second: " + te); + } catch (CancellationException ce) { + System.out.println("Task was interrupted: " + ce); + } catch (InterruptedException ie) { + System.out.println("Thread was interrupted: " + ie); + } + } + workers.shutdown(); + } + + static class JDBCTask implements Callable<Boolean> { + private String showsql = "show tables"; + private String querysql; + private int seq = 0; + Connection con = null; + Statement stmt = null; + ResultSet res = null; + + JDBCTask(Connection con, int seq, String tblName) { + this.con = con; + this.seq = seq; + querysql = "SELECT count(value) FROM " + tblName; + } + + public Boolean call() throws SQLException { + int mod = seq%10; + try { + if (mod < 2) { + String name = con.getMetaData().getDatabaseProductName(); + } else if (mod < 5) { + stmt = con.createStatement(); + res = stmt.executeQuery(querysql); + while (res.next()) { + res.getInt(1); + } + } else if (mod < 7) { + res = con.getMetaData().getSchemas(); + if (res.next()) { + res.getString(1); + } + } else { + stmt = con.createStatement(); + res = stmt.executeQuery(showsql); + if (res.next()) { + res.getString(1); + } + } + return new Boolean(true); + } finally { + try { + if (res != null) { + res.close(); + res = null; + } + if (stmt != null) { + stmt.close(); + stmt = null; + } + } catch (SQLException sqle1) { + } + } + } + } /** This test is to connect to any database without using the command "Use <<DB>>" * 1)connect to default database. http://git-wip-us.apache.org/repos/asf/hive/blob/82e79772/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 306a1cd..277f6d4 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -20,6 +20,10 @@ package org.apache.hive.jdbc; import java.io.FileInputStream; import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.security.KeyStore; import java.security.SecureRandom; import java.sql.Array; @@ -176,7 +180,6 @@ public class HiveConnection implements java.sql.Connection { // set up the client client = new TCLIService.Client(new TBinaryProtocol(transport)); } - // add supported protocols supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1); supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2); @@ -189,6 +192,9 @@ public class HiveConnection implements java.sql.Connection { // open client session openSession(); + + // Wrap the client with a thread-safe proxy to serialize the RPC calls + client = newSynchronizedClient(client); } private void openTransport() throws SQLException { @@ -1357,4 +1363,41 @@ public class HiveConnection implements java.sql.Connection { public TProtocolVersion getProtocol() { return protocol; } + + public static TCLIService.Iface newSynchronizedClient( + TCLIService.Iface client) { + return (TCLIService.Iface) Proxy.newProxyInstance( + HiveConnection.class.getClassLoader(), + new Class [] { TCLIService.Iface.class }, + new SynchronizedHandler(client)); + } + + private static class SynchronizedHandler implements InvocationHandler { + private final TCLIService.Iface client; + + SynchronizedHandler(TCLIService.Iface client) { + this.client = client; + } + + @Override + public Object invoke(Object proxy, Method method, Object [] args) + throws Throwable { + try { + synchronized (client) { + return method.invoke(client, args); + } + } catch (InvocationTargetException e) { + // all IFace APIs throw TException + if (e.getTargetException() instanceof TException) { + throw (TException)e.getTargetException(); + } else { + // should not happen + throw new TException("Error in calling method " + method.getName(), + e.getTargetException()); + } + } catch (Exception e) { + throw new TException("Error in calling method " + method.getName(), e); + } + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/82e79772/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java index e93795a..f6860f0 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java @@ -78,8 +78,6 @@ public class HiveQueryResultSet extends HiveBaseResultSet { private boolean fetchFirst = false; private final TProtocolVersion protocol; - private ReentrantLock transportLock; - public static class Builder { @@ -191,7 +189,6 @@ public class HiveQueryResultSet extends HiveBaseResultSet { this.stmtHandle = builder.stmtHandle; this.sessHandle = builder.sessHandle; this.fetchSize = builder.fetchSize; - this.transportLock = builder.transportLock; columnNames = new ArrayList<String>(); normalizedColumnNames = new ArrayList<String>(); columnTypes = new ArrayList<String>(); @@ -252,16 +249,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet { TGetResultSetMetadataReq metadataReq = new TGetResultSetMetadataReq(stmtHandle); // TODO need session handle TGetResultSetMetadataResp metadataResp; - if (transportLock == null) { - metadataResp = client.GetResultSetMetadata(metadataReq); - } else { - transportLock.lock(); - try { - metadataResp = client.GetResultSetMetadata(metadataReq); - } finally { - transportLock.unlock(); - } - } + metadataResp = client.GetResultSetMetadata(metadataReq); Utils.verifySuccess(metadataResp.getStatus()); StringBuilder namesSb = new StringBuilder(); @@ -372,16 +360,7 @@ public class HiveQueryResultSet extends HiveBaseResultSet { TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, orientation, fetchSize); TFetchResultsResp fetchResp; - if (transportLock == null) { - fetchResp = client.FetchResults(fetchReq); - } else { - transportLock.lock(); - try { - fetchResp = client.FetchResults(fetchReq); - } finally { - transportLock.unlock(); - } - } + fetchResp = client.FetchResults(fetchReq); Utils.verifySuccessWithInfo(fetchResp.getStatus()); TRowSet results = fetchResp.getResults(); http://git-wip-us.apache.org/repos/asf/hive/blob/82e79772/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 6b3d05c..170fc53 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -108,9 +108,6 @@ public class HiveStatement implements java.sql.Statement { */ private boolean isExecuteStatementFailed = false; - // A fair reentrant lock - private ReentrantLock transportLock = new ReentrantLock(true); - public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle) { this(connection, client, sessHandle, false); @@ -148,7 +145,6 @@ public class HiveStatement implements java.sql.Statement { return; } - transportLock.lock(); try { if (stmtHandle != null) { TCancelOperationReq cancelReq = new TCancelOperationReq(stmtHandle); @@ -159,8 +155,6 @@ public class HiveStatement implements java.sql.Statement { throw e; } catch (Exception e) { throw new SQLException(e.toString(), "08S01", e); - } finally { - transportLock.unlock(); } isCancelled = true; } @@ -188,7 +182,6 @@ public class HiveStatement implements java.sql.Statement { } void closeClientOperation() throws SQLException { - transportLock.lock(); try { if (stmtHandle != null) { TCloseOperationReq closeReq = new TCloseOperationReq(stmtHandle); @@ -199,8 +192,6 @@ public class HiveStatement implements java.sql.Statement { throw e; } catch (Exception e) { throw new SQLException(e.toString(), "08S01", e); - } finally { - transportLock.unlock(); } isQueryClosed = true; isExecuteStatementFailed = false; @@ -251,7 +242,6 @@ public class HiveStatement implements java.sql.Statement { execReq.setRunAsync(true); execReq.setConfOverlay(sessConf); - transportLock.lock(); try { TExecuteStatementResp execResp = client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); @@ -263,8 +253,6 @@ public class HiveStatement implements java.sql.Statement { } catch (Exception ex) { isExecuteStatementFailed = true; throw new SQLException(ex.toString(), "08S01", ex); - } finally { - transportLock.unlock(); } TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); @@ -278,12 +266,7 @@ public class HiveStatement implements java.sql.Statement { * For an async SQLOperation, GetOperationStatus will use the long polling approach * It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires */ - transportLock.lock(); - try { - statusResp = client.GetOperationStatus(statusReq); - } finally { - transportLock.unlock(); - } + statusResp = client.GetOperationStatus(statusReq); Utils.verifySuccessWithInfo(statusResp.getStatus()); if (statusResp.isSetOperationState()) { switch (statusResp.getOperationState()) { @@ -322,7 +305,7 @@ public class HiveStatement implements java.sql.Statement { } resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) - .setScrollable(isScrollableResultset).setTransportLock(transportLock) + .setScrollable(isScrollableResultset) .build(); return true; } @@ -813,7 +796,6 @@ public class HiveStatement implements java.sql.Statement { List<String> logs = new ArrayList<String>(); TFetchResultsResp tFetchResultsResp = null; - transportLock.lock(); try { if (stmtHandle != null) { TFetchResultsReq tFetchResultsReq = new TFetchResultsReq(stmtHandle, @@ -837,8 +819,6 @@ public class HiveStatement implements java.sql.Statement { throw e; } catch (Exception e) { throw new SQLException("Error when getting query log: " + e, e); - } finally { - transportLock.unlock(); } RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(),
