Repository: hbase Updated Branches: refs/heads/branch-1 24a3d6952 -> 276acdb0b
HBASE-16221 Have ThriftScanner update the ConnectionCache's last used time overtime getScannerRow() to keep the connection alive for long lived scanners Signed-off-by: Elliott Clark <ecl...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/276acdb0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/276acdb0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/276acdb0 Branch: refs/heads/branch-1 Commit: 276acdb0b0f62a4909c92334755c15c48da19595 Parents: 24a3d69 Author: Joseph Hwang <j...@fb.com> Authored: Fri Jul 22 10:38:43 2016 -0700 Committer: Elliott Clark <ecl...@apache.org> Committed: Fri Jul 22 17:58:56 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/util/ConnectionCache.java | 14 ++++++ .../thrift2/ThriftHBaseServiceHandler.java | 2 +- .../thrift2/TestThriftHBaseServiceHandler.java | 51 ++++++++++++++++++++ 3 files changed, 66 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/276acdb0/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java index 1475879..ec5294b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java @@ -189,6 +189,20 @@ public class ConnectionCache { return connInfo; } + /** + * Updates the access time for the current connection. Used to keep Connections alive for + * long-lived scanners. + * @return whether we successfully updated the last access time + */ + public boolean updateConnectionAccessTime() { + String userName = getEffectiveUser(); + ConnectionInfo connInfo = connections.get(userName); + if (connInfo != null) { + return connInfo.updateAccessTime(); + } + return false; + } + class ConnectionInfo { final Connection connection; final String userName; http://git-wip-us.apache.org/repos/asf/hbase/blob/276acdb0/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java index c59576e..5900952 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java @@ -382,8 +382,8 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface { ex.setMessage("Invalid scanner Id"); throw ex; } - try { + connectionCache.updateConnectionAccessTime(); return resultsFromHBase(scanner.next(numRows)); } catch (IOException e) { throw getTIOError(e); http://git-wip-us.apache.org/repos/asf/hbase/blob/276acdb0/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java ---------------------------------------------------------------------- diff --git a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java index 568ab0f..2a59b87 100644 --- a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java +++ b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java @@ -589,6 +589,57 @@ public class TestThriftHBaseServiceHandler { } } + /** + * Tests keeping a HBase scanner alive for long periods of time. Each call to getScannerRow() + * should reset the ConnectionCache timeout for the scanner's connection + * @throws Exception + */ + @Test + public void testLongLivedScan() throws Exception { + int numTrials = 6; + int trialPause = 1000; + int cleanUpInterval = 100; + Configuration conf = new Configuration(UTIL.getConfiguration()); + // Set the ConnectionCache timeout to trigger halfway through the trials + conf.setInt(ThriftHBaseServiceHandler.MAX_IDLETIME, (numTrials / 2) * trialPause); + conf.setInt(ThriftHBaseServiceHandler.CLEANUP_INTERVAL, cleanUpInterval); + ThriftHBaseServiceHandler handler = new ThriftHBaseServiceHandler(conf, + UserProvider.instantiate(conf)); + + ByteBuffer table = wrap(tableAname); + // insert data + TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname), + wrap(valueAname)); + List<TColumnValue> columnValues = new ArrayList<TColumnValue>(); + columnValues.add(columnValue); + for (int i = 0; i < numTrials; i++) { + TPut put = new TPut(wrap(("testScan" + i).getBytes()), columnValues); + handler.put(table, put); + } + + // create scan instance + TScan scan = new TScan(); + List<TColumn> columns = new ArrayList<TColumn>(); + TColumn column = new TColumn(); + column.setFamily(familyAname); + column.setQualifier(qualifierAname); + columns.add(column); + scan.setColumns(columns); + scan.setStartRow("testScan".getBytes()); + scan.setStopRow("testScan\uffff".getBytes()); + // Prevent the scanner from caching results + scan.setCaching(1); + + // get scanner and rows + int scanId = handler.openScanner(table, scan); + for (int i = 0; i < numTrials; i++) { + // Make sure that the Scanner doesn't throw an exception after the ConnectionCache timeout + List<TResult> results = handler.getScannerRows(scanId, 1); + assertArrayEquals(("testScan" + i).getBytes(), results.get(0).getRow()); + Thread.sleep(trialPause); + } + } + @Test public void testReverseScan() throws Exception { ThriftHBaseServiceHandler handler = createHandler();