Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 82d0878c7 -> 85aa8449d
PHOENIX-3994 Addendum - set index rpc controller factory for transactional indexer Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/85aa8449 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/85aa8449 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/85aa8449 Branch: refs/heads/4.x-HBase-1.1 Commit: 85aa8449dca6b488d47e35b93e1ba40d53f3a30e Parents: 82d0878 Author: Samarth Jain <[email protected]> Authored: Sun Jul 16 00:56:19 2017 -0700 Committer: Samarth Jain <[email protected]> Committed: Sun Jul 16 00:56:19 2017 -0700 ---------------------------------------------------------------------- .../index/PhoenixTransactionalIndexer.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/85aa8449/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 497c5ac..d486a74 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -51,6 +52,8 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; @@ -59,6 +62,7 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.MultiMutation; import org.apache.phoenix.hbase.index.ValueGetter; @@ -79,6 +83,7 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; import org.apache.phoenix.transaction.PhoenixTransactionalTable; import org.apache.phoenix.transaction.TransactionFactory; +import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; @@ -111,11 +116,20 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { String serverName = env.getRegionServerServices().getServerName().getServerName(); codec = new PhoenixIndexCodec(); codec.initialize(env); - + // Clone the config since it is shared + Configuration clonedConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); + /* + * Set the rpc controller factory so that the HTables used by IndexWriter would + * set the correct priorities on the remote RPC calls. + */ + clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); + DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env); + // setup the actual index writer // setup the actual index writer // For transactional tables, we keep the index active upon a write failure // since we have the all versus none behavior for transactions. - this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), env, serverName + "-tx-index-writer"); + this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), indexWriterEnv, serverName + "-tx-index-writer"); } @Override
