Repository: phoenix Updated Branches: refs/heads/4.11-HBase-1.1 714121cb6 -> 46afbbc7b
Revert "PHOENIX-3994 Index RPC priority still depends on the controller factory property in hbase-site.xml" This reverts commit 714121cb6d82ce7a4cfbb623d63679ce04655609. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/46afbbc7 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/46afbbc7 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/46afbbc7 Branch: refs/heads/4.11-HBase-1.1 Commit: 46afbbc7b539d2e9ae0d29c07145d1ce630687cd Parents: 714121c Author: Samarth Jain <sama...@apache.org> Authored: Wed Jul 12 00:29:42 2017 -0700 Committer: Samarth Jain <sama...@apache.org> Committed: Wed Jul 12 00:29:42 2017 -0700 ---------------------------------------------------------------------- .../apache/phoenix/rpc/PhoenixServerRpcIT.java | 7 +- .../hbase/index/write/IndexWriterUtils.java | 86 +++----------------- .../write/ParallelWriterIndexCommitter.java | 27 +++--- .../java/org/apache/phoenix/query/BaseTest.java | 4 +- 4 files changed, 32 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/46afbbc7/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java index 6119548..b9e4fff 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java @@ -28,7 +28,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -69,7 +68,8 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { Map<String, String> serverProps = Collections.singletonMap(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, TestPhoenixIndexRpcSchedulerFactory.class.getName()); // use the standard rpc controller for client rpc, so that we can isolate server rpc and ensure they use the correct queue - Map<String, String> clientProps = Collections.emptyMap(); + Map<String, String> clientProps = Collections.singletonMap(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + RpcControllerFactory.class.getName()); NUM_SLAVES_BASE = 2; setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); } @@ -143,6 +143,9 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class)); TestPhoenixIndexRpcSchedulerFactory.reset(); + createIndex(conn, indexName + "_1"); + // verify that that index queue is used and only once (during Upsert Select on server to build the index) + Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class)); } finally { conn.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/46afbbc7/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java index 703f35c..6eb657d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java @@ -17,24 +17,12 @@ */ package org.apache.phoenix.hbase.index.write; -import java.io.IOException; -import java.util.concurrent.ExecutorService; - -import javax.annotation.concurrent.GuardedBy; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.client.CoprocessorHConnection; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.phoenix.hbase.index.table.CoprocessorHTableFactory; import org.apache.phoenix.hbase.index.table.HTableFactory; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; public class IndexWriterUtils { @@ -73,68 +61,14 @@ public class IndexWriterUtils { // private ctor for utilites } - public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) { - // create a simple delegate factory, setup the way we need - Configuration conf = env.getConfiguration(); - // set the number of threads allowed per table. - int htableThreads = - conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, - IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS); - LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable."); - IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads); - if (env instanceof RegionCoprocessorEnvironment) { - RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env; - RegionServerServices services = e.getRegionServerServices(); - if (services instanceof HRegionServer) { - return new SimpleTableFactory(conf, (HRegionServer) services); - } - } else { - return new CoprocessorHTableFactory(env); - } - throw new IllegalStateException("Unexpected environment or settings!"); - } - - /** - * {@code HTableFactory} that creates HTables by using a {@link CoprocessorHConnection} This - * factory was added as a workaround to the bug reported in - * https://issues.apache.org/jira/browse/HBASE-18359 - */ - private static class SimpleTableFactory implements HTableFactory { - @GuardedBy("SimpleTableFactory.this") - private HConnection connection; - private final Configuration conf; - private final HRegionServer server; - - SimpleTableFactory(Configuration conf, HRegionServer server) { - this.conf = conf; - this.server = server; - } - - private synchronized HConnection getConnection(Configuration conf) throws IOException { - if (connection == null || connection.isClosed()) { - connection = new CoprocessorHConnection(conf, server); - } - return connection; - } - - @Override - public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException { - return getConnection(conf).getTable(tablename.copyBytesIfNecessary()); - } - - @Override - public void shutdown() { - try { - getConnection(conf).close(); - } catch (IOException e) { - LOG.error("Exception caught while trying to close the HConnection used by SimpleTableFactory"); - } - } - - @Override - public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool) - throws IOException { - return getConnection(conf).getTable(tablename.copyBytesIfNecessary(), pool); - } - } + public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) { + // create a simple delegate factory, setup the way we need + Configuration conf = env.getConfiguration(); + // set the number of threads allowed per table. + int htableThreads = + conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS); + LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable."); + IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads); + return new CoprocessorHTableFactory(env); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/46afbbc7/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java index 9f2d13d..5823bd9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java @@ -148,21 +148,22 @@ public class ParallelWriterIndexCommitter implements IndexCommitter { } HTableInterface table = null; try { - if (allowLocalUpdates && env != null) { - try { - throwFailureIfDone(); - IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true); - return null; - } catch (IOException ignord) { - // when it's failed we fall back to the standard & slow way - if (LOG.isDebugEnabled()) { - LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" - + ignord); - } - } - } + if (allowLocalUpdates && env!=null) { + try { + throwFailureIfDone(); + IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true); + return null; + } catch (IOException ignord) { + // when it's failed we fall back to the standard & slow way + if (LOG.isTraceEnabled()) { + LOG.trace("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" + + ignord); + } + } + } table = factory.getTable(tableReference.get()); throwFailureIfDone(); + int i = 0; table.batch(mutations); } catch (SingleIndexWriteFailureException e) { throw e; http://git-wip-us.apache.org/repos/asf/phoenix/blob/46afbbc7/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 08dc2ee..078c1e8 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -119,6 +119,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.util.Bytes; @@ -633,8 +634,9 @@ public abstract class BaseTest { //no point doing sanity checks when running tests. conf.setBoolean("hbase.table.sanity.checks", false); // set the server rpc controller and rpc scheduler factory, used to configure the cluster + conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_SERVER_RPC_CONTROLLER_FACTORY); conf.set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, DEFAULT_RPC_SCHEDULER_FACTORY); - + // override any defaults based on overrideProps for (Entry<String,String> entry : overrideProps) { conf.set(entry.getKey(), entry.getValue());