Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 7f38f7e70 -> 458973fd3


PHOENIX-4169 Explicitly cap timeout for index disable RPC on compaction 
(Vincent Poon)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/63a409a4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/63a409a4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/63a409a4

Branch: refs/heads/4.x-HBase-0.98
Commit: 63a409a4028ac27e06c8a152ea75cf6b8cd32d1a
Parents: 7f38f7e
Author: James Taylor <jamestay...@apache.org>
Authored: Tue Sep 12 17:06:21 2017 -0700
Committer: James Taylor <jamestay...@apache.org>
Committed: Tue Sep 12 17:06:21 2017 -0700

----------------------------------------------------------------------
 .../UngroupedAggregateRegionObserver.java       | 29 ++++++++++++++++----
 .../org/apache/phoenix/hbase/index/Indexer.java | 14 +++++++++-
 .../org/apache/phoenix/query/QueryServices.java |  4 +++
 .../phoenix/query/QueryServicesOptions.java     |  5 ++++
 4 files changed, 46 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/63a409a4/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index b4d7e7f..4ae5087 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.CoprocessorHConnection;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -68,6 +69,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import 
org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
@@ -98,6 +100,7 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PColumn;
@@ -192,6 +195,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     private static final Logger logger = 
LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
     private KeyValueBuilder kvBuilder;
     private Configuration upsertSelectConfig;
+    private Configuration compactionConfig;
 
     @Override
     public void start(CoprocessorEnvironment e) throws IOException {
@@ -212,6 +216,15 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
          */
         
upsertSelectConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
             InterRegionServerIndexRpcControllerFactory.class, 
RpcControllerFactory.class);
+
+        compactionConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
+        // lower the number of rpc retries, so we don't hang the compaction
+        compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            
e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER,
+                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER));
+        compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
+            
e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE,
+                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE));
     }
 
     private void commitBatch(HRegion region, List<Mutation> mutations, long 
blockingMemstoreSize) throws IOException {
@@ -929,11 +942,16 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                     public Void run() throws Exception {
                         MutationCode mutationCode = null;
                         long disableIndexTimestamp = 0;
-                        
-                        try (HTableInterface htable = 
e.getEnvironment().getTable(
-                                    SchemaUtil.getPhysicalTableName(
-                                            
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
-                                            
e.getEnvironment().getConfiguration()))) {
+
+                        try (CoprocessorHConnection coprocessorHConnection =
+                                new CoprocessorHConnection(compactionConfig,
+                                        (HRegionServer) e.getEnvironment()
+                                                .getRegionServerServices());
+                                HTableInterface htable =
+                                        coprocessorHConnection
+                                                
.getTable(SchemaUtil.getPhysicalTableName(
+                                                    
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
+                                                    compactionConfig))) {
                             String tableName = 
e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
                             // FIXME: if this is an index on a view, we won't 
find a row for it in SYSTEM.CATALOG
                             // Instead, we need to disable all indexes on the 
view.
@@ -946,6 +964,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                 if (cell.getValueLength() > 0) {
                                     disableIndexTimestamp = 
PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), 
cell.getValueOffset(), SortOrder.getDefault());
                                     if (disableIndexTimestamp != 0) {
+                                        logger.info("Major compaction running 
while index on table is disabled.  Clearing index disable timestamp: " + 
tableName);
                                         mutationCode = 
IndexUtil.updateIndexState(tableKey, 0L, htable, 
PIndexState.DISABLE).getMutationCode();
                                     }
                                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/63a409a4/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 9c851c4..b50b900 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -87,6 +87,7 @@ import 
org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
 import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
 import 
org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
@@ -194,6 +195,7 @@ public class Indexer extends BaseRegionObserver {
   private long slowPostOpenThreshold;
   private long slowPreIncrementThreshold;
   private int rowLockWaitDuration;
+  private Configuration compactionConfig;
   
   public static final String RecoveryFailurePolicyKeyForTesting = 
INDEX_RECOVERY_FAILURE_POLICY_KEY;
 
@@ -250,6 +252,15 @@ public class Indexer extends BaseRegionObserver {
         this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
         setSlowThresholds(e.getConfiguration());
 
+        compactionConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
+        // lower the number of rpc retries, so we don't hang the compaction
+        compactionConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            
e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRIES_NUMBER,
+                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRIES_NUMBER));
+        compactionConfig.setInt(HConstants.HBASE_CLIENT_PAUSE,
+            
e.getConfiguration().getInt(QueryServices.METADATA_WRITE_RETRY_PAUSE,
+                QueryServicesOptions.DEFAULT_METADATA_WRITE_RETRY_PAUSE));
+
         try {
           // get the specified failure policy. We only ever override it in 
tests, but we need to do it
           // here
@@ -868,12 +879,13 @@ public class Indexer extends BaseRegionObserver {
                   public Void run() throws Exception {
                       String fullTableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
                       try {
-                          PhoenixConnection conn =  
QueryUtil.getConnectionOnServer(c.getEnvironment().getConfiguration()).unwrap(PhoenixConnection.class);
+                          PhoenixConnection conn =  
QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class);
                           PTable table = PhoenixRuntime.getTableNoCache(conn, 
fullTableName);
                           // FIXME: we may need to recurse into children of 
this table too
                           for (PTable index : table.getIndexes()) {
                               if (index.getIndexDisableTimestamp() != 0) {
                                   try {
+                                      LOG.info("Major compaction running while 
index on table is disabled.  Clearing index disable timestamp: " + 
fullTableName);
                                       IndexUtil.updateIndexState(conn, 
index.getName().getString(), PIndexState.DISABLE, Long.valueOf(0L));
                                   } catch (SQLException e) {
                                       LOG.warn("Unable to permanently disable 
index " + index.getName().getString(), e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/63a409a4/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index ab5f51a..83887b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -161,6 +161,10 @@ public interface QueryServices extends SQLCloseable {
     public static final String METADATA_PRIOIRTY_ATTRIB = 
"phoenix.metadata.rpc.priority";
     public static final String ALLOW_LOCAL_INDEX_ATTRIB = 
"phoenix.index.allowLocalIndex";
 
+    // Retries when doing server side writes to SYSTEM.CATALOG
+    public static final String METADATA_WRITE_RETRIES_NUMBER = 
"phoenix.metadata.rpc.retries.number";
+    public static final String METADATA_WRITE_RETRY_PAUSE = 
"phoenix.metadata.rpc.pause";
+
     // Config parameters for for configuring tracing
     public static final String TRACING_FREQ_ATTRIB = "phoenix.trace.frequency";
     public static final String TRACING_PAGE_SIZE_ATTRIB = 
"phoenix.trace.read.pagesize";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/63a409a4/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index b9a578f..5896dae 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -195,6 +195,11 @@ public class QueryServicesOptions {
     public static final int DEFAULT_INDEX_HANDLER_COUNT = 30;
     public static final int DEFAULT_METADATA_HANDLER_COUNT = 30;
 
+    // Retries when doing server side writes to SYSTEM.CATALOG
+    // 20 retries with 100 pause = 230 seconds total retry time
+    public static final int DEFAULT_METADATA_WRITE_RETRIES_NUMBER = 20;
+    public static final int DEFAULT_METADATA_WRITE_RETRY_PAUSE = 100;
+
     public static final int DEFAULT_TRACING_PAGE_SIZE = 100;
     /**
      * Configuration key to overwrite the tablename that should be used as the 
target table

Reply via email to