PHOENIX-1413 Add Phoenix coprocessors with configurable priority
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/de7c4df1 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/de7c4df1 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/de7c4df1 Branch: refs/heads/master Commit: de7c4df1c43e5eb34630d75b30568642488fe43d Parents: 429c69d Author: James Taylor <jtay...@salesforce.com> Authored: Thu Nov 6 18:24:54 2014 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Thu Nov 6 19:50:04 2014 -0800 ---------------------------------------------------------------------- .../EndToEndCoveredColumnsIndexBuilderIT.java | 3 ++- .../org/apache/phoenix/hbase/index/Indexer.java | 10 ++++----- .../CoveredColumnIndexSpecifierBuilder.java | 4 ++-- .../query/ConnectionQueryServicesImpl.java | 23 ++++++++++---------- .../org/apache/phoenix/query/QueryServices.java | 2 ++ .../phoenix/query/QueryServicesOptions.java | 5 +++++ 6 files changed, 28 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/de7c4df1/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java index fc134a3..f635dff 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java @@ -33,6 +33,7 @@ import java.util.Queue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; @@ -302,7 +303,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT { // initializer blows up. indexerOpts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, CoveredIndexCodecForTesting.class.getName()); - Indexer.enableIndexing(desc, CoveredColumnsIndexBuilder.class, indexerOpts); + Indexer.enableIndexing(desc, CoveredColumnsIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER); // create the table HBaseAdmin admin = UTIL.getHBaseAdmin(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/de7c4df1/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 9c48a8d..b841410 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -31,7 +31,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -607,18 +606,19 @@ public class Indexer extends BaseRegionObserver { /** * Enable indexing on the given table * @param desc {@link HTableDescriptor} for the table on which indexing should be enabled - * @param builder class to use when building the index for this table - * @param properties map of custom configuration options to make available to your + * @param builder class to use when building the index for this table + * @param properties map of custom configuration options to make available to your * {@link IndexBuilder} on the server-side + * @param priority TODO * @throws IOException the Indexer coprocessor cannot be added */ public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBuilder> builder, - Map<String, String> properties) throws IOException { + Map<String, String> properties, int priority) throws IOException { if (properties == null) { properties = new HashMap<String, String>(); } properties.put(Indexer.INDEX_BUILDER_CONF_KEY, builder.getName()); - desc.addCoprocessor(Indexer.class.getName(), null, Coprocessor.PRIORITY_USER, properties); + desc.addCoprocessor(Indexer.class.getName(), null, priority, properties); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/de7c4df1/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java index 9fcd5f3..6ac89d1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java @@ -26,8 +26,8 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HTableDescriptor; - import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; import org.apache.phoenix.hbase.index.covered.IndexCodec; @@ -137,7 +137,7 @@ public class CoveredColumnIndexSpecifierBuilder { // add the codec for the index to the map of options Map<String, String> opts = this.convertToMap(); opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName()); - Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts); + Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts, Coprocessor.PRIORITY_USER); } static List<ColumnGroup> getColumns(Configuration conf) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/de7c4df1/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 780e34a..7036909 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -604,18 +604,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType) throws SQLException { // The phoenix jar must be available on HBase classpath + int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY); try { if (!descriptor.hasCoprocessor(ScanRegionObserver.class.getName())) { - descriptor.addCoprocessor(ScanRegionObserver.class.getName(), null, 1, null); + descriptor.addCoprocessor(ScanRegionObserver.class.getName(), null, priority, null); } if (!descriptor.hasCoprocessor(UngroupedAggregateRegionObserver.class.getName())) { - descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(), null, 1, null); + descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(), null, priority, null); } if (!descriptor.hasCoprocessor(GroupedAggregateRegionObserver.class.getName())) { - descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(), null, 1, null); + descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(), null, priority, null); } if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) { - descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null); + descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null); } // TODO: better encapsulation for this // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. @@ -627,11 +628,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement && !descriptor.hasCoprocessor(Indexer.class.getName())) { Map<String, String> opts = Maps.newHashMapWithExpectedSize(1); opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); - Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts); + Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority); } if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) { descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(), - null, 1, null); + null, priority, null); } if (descriptor.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null @@ -639,13 +640,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { if (!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) { descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(), - null, 1, null); + null, priority, null); } } else { if (!descriptor.hasCoprocessor(LocalIndexSplitter.class.getName()) && !SchemaUtil.isMetaTable(tableName) && !SchemaUtil.isSequenceTable(tableName)) { - descriptor.addCoprocessor(LocalIndexSplitter.class.getName(), null, 1, null); + descriptor.addCoprocessor(LocalIndexSplitter.class.getName(), null, priority, null); } } @@ -653,14 +654,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // stay on the same region. if (SchemaUtil.isMetaTable(tableName)) { if (!descriptor.hasCoprocessor(MetaDataEndpointImpl.class.getName())) { - descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, 1, null); + descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, priority, null); } if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) { - descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, 2, null); + descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, priority + 1, null); } } else if (SchemaUtil.isSequenceTable(tableName)) { if (!descriptor.hasCoprocessor(SequenceRegionObserver.class.getName())) { - descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, 1, null); + descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null); } } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/de7c4df1/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 72002ae..414ed57 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -139,6 +139,8 @@ public interface QueryServices extends SQLCloseable { public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime"; public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets"; + public static final String COPROCESSOR_PRIORITY_ATTRIB = "phoenix.coprocessor.priority"; + /** * Get executor service used for parallel scans */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/de7c4df1/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 7c8ecd4..8491783 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -60,6 +60,7 @@ import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.trace.util.Tracing; @@ -156,6 +157,10 @@ public class QueryServicesOptions { * Use only first time SYSTEM.SEQUENCE table is created. */ public static final int DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS = SaltingUtil.MAX_BUCKET_NUM; + /** + * Default value for coprocessor priority is between SYSTEM and USER priority. + */ + public static final int DEFAULT_COPROCESSOR_PRIORITY = Coprocessor.PRIORITY_SYSTEM/2 + Coprocessor.PRIORITY_USER/2; // Divide individually to prevent any overflow private final Configuration config;