PHOENIX-1413 Add Phoenix coprocessors with configurable priority

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/de7c4df1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/de7c4df1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/de7c4df1

Branch: refs/heads/master
Commit: de7c4df1c43e5eb34630d75b30568642488fe43d
Parents: 429c69d
Author: James Taylor <jtay...@salesforce.com>
Authored: Thu Nov 6 18:24:54 2014 -0800
Committer: James Taylor <jtay...@salesforce.com>
Committed: Thu Nov 6 19:50:04 2014 -0800

----------------------------------------------------------------------
 .../EndToEndCoveredColumnsIndexBuilderIT.java   |  3 ++-
 .../org/apache/phoenix/hbase/index/Indexer.java | 10 ++++-----
 .../CoveredColumnIndexSpecifierBuilder.java     |  4 ++--
 .../query/ConnectionQueryServicesImpl.java      | 23 ++++++++++----------
 .../org/apache/phoenix/query/QueryServices.java |  2 ++
 .../phoenix/query/QueryServicesOptions.java     |  5 +++++
 6 files changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/de7c4df1/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index fc134a3..f635dff 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -33,6 +33,7 @@ import java.util.Queue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -302,7 +303,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
     // initializer blows up.
     indexerOpts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY,
       CoveredIndexCodecForTesting.class.getName());
-    Indexer.enableIndexing(desc, CoveredColumnsIndexBuilder.class, 
indexerOpts);
+    Indexer.enableIndexing(desc, CoveredColumnsIndexBuilder.class, 
indexerOpts, Coprocessor.PRIORITY_USER);
 
     // create the table
     HBaseAdmin admin = UTIL.getHBaseAdmin();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/de7c4df1/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 9c48a8d..b841410 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -31,7 +31,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -607,18 +606,19 @@ public class Indexer extends BaseRegionObserver {
   /**
    * Enable indexing on the given table
    * @param desc {@link HTableDescriptor} for the table on which indexing 
should be enabled
-   * @param builder class to use when building the index for this table
-   * @param properties map of custom configuration options to make available 
to your
+ * @param builder class to use when building the index for this table
+ * @param properties map of custom configuration options to make available to 
your
    *          {@link IndexBuilder} on the server-side
+ * @param priority TODO
    * @throws IOException the Indexer coprocessor cannot be added
    */
   public static void enableIndexing(HTableDescriptor desc, Class<? extends 
IndexBuilder> builder,
-      Map<String, String> properties) throws IOException {
+      Map<String, String> properties, int priority) throws IOException {
     if (properties == null) {
       properties = new HashMap<String, String>();
     }
     properties.put(Indexer.INDEX_BUILDER_CONF_KEY, builder.getName());
-    desc.addCoprocessor(Indexer.class.getName(), null, 
Coprocessor.PRIORITY_USER, properties);
+    desc.addCoprocessor(Indexer.class.getName(), null, priority, properties);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/de7c4df1/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
index 9fcd5f3..6ac89d1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
@@ -26,8 +26,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HTableDescriptor;
-
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
@@ -137,7 +137,7 @@ public class CoveredColumnIndexSpecifierBuilder {
     // add the codec for the index to the map of options
     Map<String, String> opts = this.convertToMap();
     opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName());
-    Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts);
+    Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts, 
Coprocessor.PRIORITY_USER);
   }
 
   static List<ColumnGroup> getColumns(Configuration conf) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/de7c4df1/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 780e34a..7036909 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -604,18 +604,19 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
 
     private void addCoprocessors(byte[] tableName, HTableDescriptor 
descriptor, PTableType tableType) throws SQLException {
         // The phoenix jar must be available on HBase classpath
+        int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, 
QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
         try {
             if 
(!descriptor.hasCoprocessor(ScanRegionObserver.class.getName())) {
-                descriptor.addCoprocessor(ScanRegionObserver.class.getName(), 
null, 1, null);
+                descriptor.addCoprocessor(ScanRegionObserver.class.getName(), 
null, priority, null);
             }
             if 
(!descriptor.hasCoprocessor(UngroupedAggregateRegionObserver.class.getName())) {
-                
descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(), 
null, 1, null);
+                
descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(), 
null, priority, null);
             }
             if 
(!descriptor.hasCoprocessor(GroupedAggregateRegionObserver.class.getName())) {
-                
descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(), null, 
1, null);
+                
descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(), null, 
priority, null);
             }
             if 
(!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
-                
descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, 
null);
+                
descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 
priority, null);
             }
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing 
coprocessor for indexes.
@@ -627,11 +628,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     && !descriptor.hasCoprocessor(Indexer.class.getName())) {
                 Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
                 opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, 
PhoenixIndexCodec.class.getName());
-                Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, 
opts);
+                Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, 
opts, priority);
             }
             if (SchemaUtil.isStatsTable(tableName) && 
!descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
                 
descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),
-                        null, 1, null);
+                        null, priority, null);
             }
             
             if 
(descriptor.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null
@@ -639,13 +640,13 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                             
.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
                 if 
(!descriptor.hasCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName())) 
{
                     
descriptor.addCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName(),
-                        null, 1, null);
+                        null, priority, null);
                 }
             } else {
                 if 
(!descriptor.hasCoprocessor(LocalIndexSplitter.class.getName())
                         && !SchemaUtil.isMetaTable(tableName)
                         && !SchemaUtil.isSequenceTable(tableName)) {
-                    
descriptor.addCoprocessor(LocalIndexSplitter.class.getName(), null, 1, null);
+                    
descriptor.addCoprocessor(LocalIndexSplitter.class.getName(), null, priority, 
null);
                 }
             }
 
@@ -653,14 +654,14 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             // stay on the same region.
             if (SchemaUtil.isMetaTable(tableName)) {
                 if 
(!descriptor.hasCoprocessor(MetaDataEndpointImpl.class.getName())) {
-                    
descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, 1, null);
+                    
descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, priority, 
null);
                 }
                 if 
(!descriptor.hasCoprocessor(MetaDataRegionObserver.class.getName())) {
-                    
descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, 2, 
null);
+                    
descriptor.addCoprocessor(MetaDataRegionObserver.class.getName(), null, 
priority + 1, null);
                 }
             } else if (SchemaUtil.isSequenceTable(tableName)) {
                 if 
(!descriptor.hasCoprocessor(SequenceRegionObserver.class.getName())) {
-                    
descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, 1, 
null);
+                    
descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, 
priority, null);
                 }
             }
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/de7c4df1/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 72002ae..414ed57 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -139,6 +139,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String STATS_USE_CURRENT_TIME_ATTRIB = 
"phoenix.stats.useCurrentTime";
 
     public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = 
"phoenix.sequence.saltBuckets";
+    public static final String COPROCESSOR_PRIORITY_ATTRIB = 
"phoenix.coprocessor.priority";
+    
     /**
      * Get executor service used for parallel scans
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/de7c4df1/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 7c8ecd4..8491783 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -60,6 +60,7 @@ import static 
org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.trace.util.Tracing;
@@ -156,6 +157,10 @@ public class QueryServicesOptions {
      * Use only first time SYSTEM.SEQUENCE table is created.
      */
     public static final int DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS = 
SaltingUtil.MAX_BUCKET_NUM;
+    /**
+     * Default value for coprocessor priority is between SYSTEM and USER 
priority.
+     */
+    public static final int DEFAULT_COPROCESSOR_PRIORITY = 
Coprocessor.PRIORITY_SYSTEM/2 + Coprocessor.PRIORITY_USER/2; // Divide 
individually to prevent any overflow
 
     private final Configuration config;
 

Reply via email to