PHOENIX-1413 Add Phoenix coprocessors with configurable priority Conflicts: phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/46d21602 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/46d21602 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/46d21602 Branch: refs/heads/3.2 Commit: 46d216028da35f749223cfebf4e9677ce15bacd7 Parents: da6d554 Author: James Taylor <jtay...@salesforce.com> Authored: Thu Nov 6 18:24:54 2014 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Thu Nov 6 22:48:23 2014 -0800 ---------------------------------------------------------------------- .../EndToEndCoveredColumnsIndexBuilderIT.java | 3 ++- .../org/apache/phoenix/hbase/index/Indexer.java | 10 +++++----- .../CoveredColumnIndexSpecifierBuilder.java | 4 ++-- .../query/ConnectionQueryServicesImpl.java | 19 ++++++++++--------- .../org/apache/phoenix/query/QueryServices.java | 2 ++ .../phoenix/query/QueryServicesOptions.java | 5 +++++ 6 files changed, 26 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/46d21602/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java index d7b9099..75ed0bf 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Queue; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; @@ -301,7 +302,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT { // initializer blows up. indexerOpts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, CoveredIndexCodecForTesting.class.getName()); - Indexer.enableIndexing(desc, CoveredColumnsIndexBuilder.class, indexerOpts); + Indexer.enableIndexing(desc, CoveredColumnsIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER); // create the table HBaseAdmin admin = UTIL.getHBaseAdmin(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/46d21602/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 3ab400a..8531361 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -32,7 +32,6 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -639,17 +638,18 @@ public class Indexer extends BaseRegionObserver { /** * Enable indexing on the given table * @param desc {@link HTableDescriptor} for the table on which indexing should be enabled - * @param builder class to use when building the index for this table - * @param properties map of custom configuration options to make available to your + * @param builder class to use when building the index for this table + * @param properties map of custom configuration options to make available to your * {@link IndexBuilder} on the server-side + * @param priority TODO * @throws IOException the Indexer coprocessor cannot be added */ public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBuilder> builder, - Map<String, String> properties) throws IOException { + Map<String, String> properties, int priority) throws IOException { if (properties == null) { properties = new HashMap<String, String>(); } properties.put(Indexer.INDEX_BUILDER_CONF_KEY, builder.getName()); - desc.addCoprocessor(Indexer.class.getName(), null, Coprocessor.PRIORITY_USER, properties); + desc.addCoprocessor(Indexer.class.getName(), null, priority, properties); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/46d21602/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java index 9fcd5f3..6ac89d1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java @@ -26,8 +26,8 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HTableDescriptor; - import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; import org.apache.phoenix.hbase.index.covered.IndexCodec; @@ -137,7 +137,7 @@ public class CoveredColumnIndexSpecifierBuilder { // add the codec for the index to the map of options Map<String, String> opts = this.convertToMap(); opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName()); - Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts); + Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts, Coprocessor.PRIORITY_USER); } static List<ColumnGroup> getColumns(Configuration conf) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/46d21602/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 31d46e0..a65898f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -530,18 +530,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType) throws SQLException { // The phoenix jar must be available on HBase classpath + int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY); try { if (!descriptor.hasCoprocessor(ScanRegionObserver.class.getName())) { - descriptor.addCoprocessor(ScanRegionObserver.class.getName(), null, 1, null); + descriptor.addCoprocessor(ScanRegionObserver.class.getName(), null, priority, null); } if (!descriptor.hasCoprocessor(UngroupedAggregateRegionObserver.class.getName())) { - descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(), null, 1, null); + descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(), null, priority, null); } if (!descriptor.hasCoprocessor(GroupedAggregateRegionObserver.class.getName())) { - descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(), null, 1, null); + descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(), null, priority, null); } if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) { - descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null); + descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null); } // TODO: better encapsulation for this @@ -554,25 +555,25 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement && !descriptor.hasCoprocessor(Indexer.class.getName())) { Map<String, String> opts = Maps.newHashMapWithExpectedSize(1); opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); - Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts); + Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority); } if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) { descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(), - null, 1, null); + null, priority, null); } // Setup split policy on Phoenix metadata table to ensure that the key values of a Phoenix table // stay on the same region. if (SchemaUtil.isMetaTable(tableName)) { if (!descriptor.hasCoprocessor(MetaDataEndpointImpl.class.getName())) { - descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, 1, null); + descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, priority, null); } if (!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) { - descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, 2, null); + descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, priority + 1, null); } } else if (SchemaUtil.isSequenceTable(tableName)) { if (!descriptor.hasCoprocessor(SequenceRegionObserver.class.getName())) { - descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, 1, null); + descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null); } } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/46d21602/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index d3faf2e..812879e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -127,6 +127,8 @@ public interface QueryServices extends SQLCloseable { public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime"; public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets"; + public static final String COPROCESSOR_PRIORITY_ATTRIB = "phoenix.coprocessor.priority"; + /** * Get executor service used for parallel scans */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/46d21602/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 117f285..67eb690 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -60,6 +60,7 @@ import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.regionserver.wal.WALEditCodec; import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.util.DateUtil; @@ -141,6 +142,10 @@ public class QueryServicesOptions { * Use only first time SYSTEM.SEQUENCE table is created. */ public static final int DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS = SaltingUtil.MAX_BUCKET_NUM; + /** + * Default value for coprocessor priority is between SYSTEM and USER priority. + */ + public static final int DEFAULT_COPROCESSOR_PRIORITY = Coprocessor.PRIORITY_SYSTEM/2 + Coprocessor.PRIORITY_USER/2; // Divide individually to prevent any overflow private final Configuration config;