Repository: phoenix Updated Branches: refs/heads/4.x-cdh5.12 031ca5a1f -> 157139688 (forced update)
PHOENIX-4683 Cap timeouts for stats precompact hook logic Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7871e72c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7871e72c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7871e72c Branch: refs/heads/4.x-cdh5.12 Commit: 7871e72c95c4966e6dcc81b02cab65ad29a69bff Parents: f3defc4 Author: Vincent Poon <vincentp...@apache.org> Authored: Mon Apr 9 22:04:28 2018 +0100 Committer: Pedro Boado <pbo...@apache.org> Committed: Fri Apr 13 23:24:49 2018 +0100 ---------------------------------------------------------------------- .../DelegateRegionCoprocessorEnvironment.java | 9 ++- .../UngroupedAggregateRegionObserver.java | 8 ++- .../hbase/index/write/IndexWriterUtils.java | 71 ++----------------- .../org/apache/phoenix/util/ServerUtil.java | 72 ++++++++++++++++++++ 4 files changed, 89 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7871e72c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java index 380212e..284d53c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionCoprocessorEnvironment.java @@ -29,6 +29,9 @@ import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.phoenix.hbase.index.table.HTableFactory; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.util.ServerUtil; /** * Class to encapsulate {@link RegionCoprocessorEnvironment} for phoenix coprocessors. Often we @@ -39,10 +42,12 @@ public class DelegateRegionCoprocessorEnvironment implements RegionCoprocessorEn private final Configuration config; private RegionCoprocessorEnvironment delegate; + private HTableFactory tableFactory; public DelegateRegionCoprocessorEnvironment(Configuration config, RegionCoprocessorEnvironment delegate) { this.config = config; this.delegate = delegate; + this.tableFactory = ServerUtil.getDelegateHTableFactory(this, config); } @Override @@ -77,13 +82,13 @@ public class DelegateRegionCoprocessorEnvironment implements RegionCoprocessorEn @Override public HTableInterface getTable(TableName tableName) throws IOException { - return delegate.getTable(tableName); + return tableFactory.getTable(new ImmutableBytesPtr(tableName.getName())); } @Override public HTableInterface getTable(TableName tableName, ExecutorService service) throws IOException { - return delegate.getTable(tableName, service); + return tableFactory.getTable(new ImmutableBytesPtr(tableName.getName())); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/7871e72c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 965ba1b..27d3880 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -94,6 +94,7 @@ import org.apache.phoenix.hbase.index.exception.IndexWriteException; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.hbase.index.write.IndexWriterUtils; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.index.PhoenixIndexFailurePolicy; @@ -978,10 +979,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver InternalScanner internalScanner = scanner; try { long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis(); + DelegateRegionCoprocessorEnvironment compactionConfEnv = new DelegateRegionCoprocessorEnvironment(compactionConfig, c.getEnvironment()); StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector( - c.getEnvironment(), table.getNameAsString(), clientTimeStamp, + compactionConfEnv, table.getNameAsString(), clientTimeStamp, store.getFamily().getName()); - internalScanner = stats.createCompactionScanner(c.getEnvironment(), store, scanner); + internalScanner = + stats.createCompactionScanner(compactionConfEnv, + store, scanner); } catch (Exception e) { // If we can't reach the stats table, don't interrupt the normal // compaction operation, just log a warning. http://git-wip-us.apache.org/repos/asf/phoenix/blob/7871e72c/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java index 29b9faf..76d6800 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java @@ -17,25 +17,14 @@ */ package org.apache.phoenix.hbase.index.write; -import java.io.IOException; -import java.util.concurrent.ExecutorService; - -import javax.annotation.concurrent.GuardedBy; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.client.CoprocessorHConnection; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.phoenix.hbase.index.table.CoprocessorHTableFactory; import org.apache.phoenix.hbase.index.table.HTableFactory; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ServerUtil; public class IndexWriterUtils { @@ -86,66 +75,14 @@ public class IndexWriterUtils { public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) { // create a simple delegate factory, setup the way we need - Configuration conf = env.getConfiguration(); + Configuration conf = PropertiesUtil.cloneConfig(env.getConfiguration()); // set the number of threads allowed per table. int htableThreads = conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS); LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable."); IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads); - if (env instanceof RegionCoprocessorEnvironment) { - RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env; - RegionServerServices services = e.getRegionServerServices(); - if (services instanceof HRegionServer) { - return new CoprocessorHConnectionTableFactory(conf, (HRegionServer) services); - } - } - return new CoprocessorHTableFactory(env); + return ServerUtil.getDelegateHTableFactory(env, conf); } - /** - * {@code HTableFactory} that creates HTables by using a {@link CoprocessorHConnection} This - * factory was added as a workaround to the bug reported in - * https://issues.apache.org/jira/browse/HBASE-18359 - */ - private static class CoprocessorHConnectionTableFactory implements HTableFactory { - @GuardedBy("CoprocessorHConnectionTableFactory.this") - private HConnection connection; - private final Configuration conf; - private final HRegionServer server; - - CoprocessorHConnectionTableFactory(Configuration conf, HRegionServer server) { - this.conf = conf; - this.server = server; - } - - private synchronized HConnection getConnection(Configuration conf) throws IOException { - if (connection == null || connection.isClosed()) { - connection = new CoprocessorHConnection(conf, server); - } - return connection; - } - - @Override - public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException { - return getConnection(conf).getTable(tablename.copyBytesIfNecessary()); - } - - @Override - public synchronized void shutdown() { - try { - if (connection != null && !connection.isClosed()) { - connection.close(); - } - } catch (Throwable e) { - LOG.warn("Error while trying to close the HConnection used by CoprocessorHConnectionTableFactory", e); - } - } - - @Override - public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool) - throws IOException { - return getConnection(conf).getTable(tablename.copyBytesIfNecessary(), pool); - } - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7871e72c/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java index bc2b625..4b3cc43 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java @@ -21,29 +21,44 @@ import java.io.IOException; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.annotation.concurrent.GuardedBy; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.CoprocessorHConnection; +import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException; import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.hbase.index.table.CoprocessorHTableFactory; +import org.apache.phoenix.hbase.index.table.HTableFactory; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; @SuppressWarnings("deprecation") public class ServerUtil { + private static final Log LOG = LogFactory.getLog(ServerUtil.class); private static final int COPROCESSOR_SCAN_WORKS = VersionUtil.encodeVersion("0.98.6"); private static final String FORMAT = "ERROR %d (%s): %s"; @@ -254,4 +269,61 @@ public class ServerUtil { endKey) < 0)); } + public static HTableFactory getDelegateHTableFactory(CoprocessorEnvironment env, Configuration conf) { + if (env instanceof RegionCoprocessorEnvironment) { + RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env; + RegionServerServices services = e.getRegionServerServices(); + if (services instanceof HRegionServer) { + return new CoprocessorHConnectionTableFactory(conf, (HRegionServer) services); + } + } + return new CoprocessorHTableFactory(env); + } + + /** + * {@code HTableFactory} that creates HTables by using a {@link CoprocessorHConnection} This + * factory was added as a workaround to the bug reported in + * https://issues.apache.org/jira/browse/HBASE-18359 + */ + public static class CoprocessorHConnectionTableFactory implements HTableFactory { + @GuardedBy("CoprocessorHConnectionTableFactory.this") + private HConnection connection; + private final Configuration conf; + private final HRegionServer server; + + CoprocessorHConnectionTableFactory(Configuration conf, HRegionServer server) { + this.conf = conf; + this.server = server; + } + + private synchronized HConnection getConnection(Configuration conf) throws IOException { + if (connection == null || connection.isClosed()) { + connection = new CoprocessorHConnection(conf, server); + } + return connection; + } + + @Override + public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException { + return getConnection(conf).getTable(tablename.copyBytesIfNecessary()); + } + + @Override + public synchronized void shutdown() { + try { + if (connection != null && !connection.isClosed()) { + connection.close(); + } + } catch (Throwable e) { + LOG.warn("Error while trying to close the HConnection used by CoprocessorHConnectionTableFactory", e); + } + } + + @Override + public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool) + throws IOException { + return getConnection(conf).getTable(tablename.copyBytesIfNecessary(), pool); + } + } + }