Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 2e20b5131 -> b0f87aaf4
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0f87aaf/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java new file mode 100644 index 0000000..7d8904d --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ScannerLeaseRenewalTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.query; + +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED; +import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.lang.ref.WeakReference; +import java.sql.DriverManager; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.phoenix.iterate.RenewLeaseOnlyTableIterator; +import org.apache.phoenix.iterate.TableResultIterator; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.ConnectionQueryServicesImpl.RenewLeaseTask; +import org.apache.phoenix.util.PropertiesUtil; +import org.junit.Test; + +public class ScannerLeaseRenewalTest extends BaseConnectionlessQueryTest { + + @Test + public void testRenewLeaseTaskBehavior() throws Exception { + // add connection to the queue + PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class); + LinkedBlockingQueue<WeakReference<PhoenixConnection>> connectionsQueue = new LinkedBlockingQueue<>(); + connectionsQueue.add(new WeakReference<PhoenixConnection>(pconn)); + + // create a scanner and add it to the queue + int numLeaseRenewals = 4; + int skipRenewLeaseCount = 2; + RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, skipRenewLeaseCount, -1); + LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners(); + scannerQueue.add(new WeakReference<TableResultIterator>(itr)); + + RenewLeaseTask task = new RenewLeaseTask(connectionsQueue); + assertTrue(connectionsQueue.size() == 1); + assertTrue(scannerQueue.size() == 1); + + task.run(); + assertTrue(connectionsQueue.size() == 1); + assertTrue(scannerQueue.size() == 1); // lease renewed + assertEquals(RENEWED, itr.getLastRenewLeaseStatus()); + + task.run(); + assertTrue(scannerQueue.size() == 1); + assertTrue(connectionsQueue.size() == 1); // renew lease skipped but scanner still in the queue + assertEquals(THRESHOLD_NOT_REACHED, itr.getLastRenewLeaseStatus()); + + task.run(); + assertTrue(scannerQueue.size() == 1); + assertTrue(connectionsQueue.size() == 1); + assertEquals(RENEWED, itr.getLastRenewLeaseStatus()); // lease renewed + + task.run(); + assertTrue(scannerQueue.size() == 1); + assertTrue(connectionsQueue.size() == 1); + assertEquals(RENEWED, itr.getLastRenewLeaseStatus()); // lease renewed + + task.run(); + assertTrue(scannerQueue.size() == 0); + assertTrue(connectionsQueue.size() == 1); + assertEquals(CLOSED, itr.getLastRenewLeaseStatus()); // scanner closed and removed from the queue + + pconn.close(); + task.run(); + assertTrue(scannerQueue.size() == 0); + assertTrue("Closing the connection should have removed it from the queue", connectionsQueue.size() == 0); + } + + @Test + public void testRenewLeaseTaskBehaviorOnError() throws Exception { + // add connection to the queue + PhoenixConnection pconn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class); + LinkedBlockingQueue<WeakReference<PhoenixConnection>> connectionsQueue = new LinkedBlockingQueue<>(); + connectionsQueue.add(new WeakReference<PhoenixConnection>(pconn)); + + // create a scanner and add it to the queue + int numLeaseRenewals = 4; + int thresholdNotReachedCount = 2; + int leaseNotRenewedCount = 3; + RenewLeaseOnlyTableIterator itr = new RenewLeaseOnlyTableIterator(numLeaseRenewals, thresholdNotReachedCount, leaseNotRenewedCount); + LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue = pconn.getScanners(); + scannerQueue.add(new WeakReference<TableResultIterator>(itr)); + + RenewLeaseTask task = new RenewLeaseTask(connectionsQueue); + assertTrue(connectionsQueue.size() == 1); + assertTrue(scannerQueue.size() == 1); + + task.run(); + assertTrue(connectionsQueue.size() == 1); + assertTrue(scannerQueue.size() == 1); // lease renewed + assertEquals(RENEWED, itr.getLastRenewLeaseStatus()); + + task.run(); + assertTrue(scannerQueue.size() == 1); + assertTrue(connectionsQueue.size() == 1); // renew lease skipped but scanner still in the queue + assertEquals(THRESHOLD_NOT_REACHED, itr.getLastRenewLeaseStatus()); + + task.run(); + assertTrue(scannerQueue.size() == 0); + assertTrue(connectionsQueue.size() == 1); + // Lease not renewed due to error or some other reason. + // In this case we don't call renew lease on the scanner anymore. + assertEquals(NOT_RENEWED, itr.getLastRenewLeaseStatus()); + + pconn.close(); + task.run(); + assertTrue(scannerQueue.size() == 0); + assertTrue("Closing the connection should have removed it from the queue", connectionsQueue.size() == 0); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0f87aaf/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d379bfa..d0d9a4e 100644 --- a/pom.xml +++ b/pom.xml @@ -80,7 +80,7 @@ <top.dir>${project.basedir}</top.dir> <!-- Hadoop Versions --> - <hbase.version>0.98.12-hadoop2</hbase.version> + <hbase.version>0.98.16-hadoop2</hbase.version> <hadoop-two.version>2.2.0</hadoop-two.version> <!-- Dependency versions -->