Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 3bf0e7451 -> b6ded29d2
Revert "PHOENIX-3994 Index RPC priority still depends on the controller factory property in hbase-site.xml" This reverts commit 3bf0e74519f2aaad963475121f18d8d0053c9a5e. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b6ded29d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b6ded29d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b6ded29d Branch: refs/heads/4.x-HBase-0.98 Commit: b6ded29d20d27419e11b57a339ef0dc24bd3f4fa Parents: 3bf0e74 Author: Samarth Jain <sama...@apache.org> Authored: Wed Jul 12 00:28:01 2017 -0700 Committer: Samarth Jain <sama...@apache.org> Committed: Wed Jul 12 00:28:01 2017 -0700 ---------------------------------------------------------------------- .../apache/phoenix/rpc/PhoenixServerRpcIT.java | 7 +- .../hbase/index/write/IndexWriterUtils.java | 86 +++----------------- .../write/ParallelWriterIndexCommitter.java | 7 +- .../java/org/apache/phoenix/query/BaseTest.java | 2 + 4 files changed, 22 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b6ded29d/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java index 48974c1..6782c3e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java @@ -28,7 +28,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -68,7 +67,8 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { Map<String, String> serverProps = Collections.singletonMap(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, TestPhoenixIndexRpcSchedulerFactory.class.getName()); // use the standard rpc controller for client rpc, so that we can isolate server rpc and ensure they use the correct queue - Map<String, String> clientProps = Collections.emptyMap(); + Map<String, String> clientProps = Collections.singletonMap(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + RpcControllerFactory.class.getName()); NUM_SLAVES_BASE = 2; setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); } @@ -142,6 +142,9 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class)); TestPhoenixIndexRpcSchedulerFactory.reset(); + createIndex(conn, indexName + "_1"); + // verify that that index queue is used and only once (during Upsert Select on server to build the index) + Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class)); } finally { conn.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b6ded29d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java index 703f35c..6eb657d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java @@ -17,24 +17,12 @@ */ package org.apache.phoenix.hbase.index.write; -import java.io.IOException; -import java.util.concurrent.ExecutorService; - -import javax.annotation.concurrent.GuardedBy; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.client.CoprocessorHConnection; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.phoenix.hbase.index.table.CoprocessorHTableFactory; import org.apache.phoenix.hbase.index.table.HTableFactory; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; public class IndexWriterUtils { @@ -73,68 +61,14 @@ public class IndexWriterUtils { // private ctor for utilites } - public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) { - // create a simple delegate factory, setup the way we need - Configuration conf = env.getConfiguration(); - // set the number of threads allowed per table. - int htableThreads = - conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, - IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS); - LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable."); - IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads); - if (env instanceof RegionCoprocessorEnvironment) { - RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env; - RegionServerServices services = e.getRegionServerServices(); - if (services instanceof HRegionServer) { - return new SimpleTableFactory(conf, (HRegionServer) services); - } - } else { - return new CoprocessorHTableFactory(env); - } - throw new IllegalStateException("Unexpected environment or settings!"); - } - - /** - * {@code HTableFactory} that creates HTables by using a {@link CoprocessorHConnection} This - * factory was added as a workaround to the bug reported in - * https://issues.apache.org/jira/browse/HBASE-18359 - */ - private static class SimpleTableFactory implements HTableFactory { - @GuardedBy("SimpleTableFactory.this") - private HConnection connection; - private final Configuration conf; - private final HRegionServer server; - - SimpleTableFactory(Configuration conf, HRegionServer server) { - this.conf = conf; - this.server = server; - } - - private synchronized HConnection getConnection(Configuration conf) throws IOException { - if (connection == null || connection.isClosed()) { - connection = new CoprocessorHConnection(conf, server); - } - return connection; - } - - @Override - public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException { - return getConnection(conf).getTable(tablename.copyBytesIfNecessary()); - } - - @Override - public void shutdown() { - try { - getConnection(conf).close(); - } catch (IOException e) { - LOG.error("Exception caught while trying to close the HConnection used by SimpleTableFactory"); - } - } - - @Override - public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool) - throws IOException { - return getConnection(conf).getTable(tablename.copyBytesIfNecessary(), pool); - } - } + public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) { + // create a simple delegate factory, setup the way we need + Configuration conf = env.getConfiguration(); + // set the number of threads allowed per table. + int htableThreads = + conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS); + LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable."); + IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads); + return new CoprocessorHTableFactory(env); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b6ded29d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java index 9f2d13d..b912772 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java @@ -21,7 +21,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -155,8 +158,8 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { return null; } catch (IOException ignord) { // when it's failed we fall back to the standard & slow way - if (LOG.isDebugEnabled()) { - LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" + if (LOG.isTraceEnabled()) { + LOG.trace("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" + ignord); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b6ded29d/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index a124ed5..b01fc4f 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -118,6 +118,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; @@ -570,6 +571,7 @@ public abstract class BaseTest { //no point doing sanity checks when running tests. conf.setBoolean("hbase.table.sanity.checks", false); // set the server rpc controller and rpc scheduler factory, used to configure the cluster + conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_SERVER_RPC_CONTROLLER_FACTORY); conf.set(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, DEFAULT_RPC_SCHEDULER_FACTORY); conf.setLong(HConstants.ZK_SESSION_TIMEOUT, 10 * HConstants.DEFAULT_ZK_SESSION_TIMEOUT); conf.setLong(HConstants.ZOOKEEPER_TICK_TIME, 6 * 1000);