charlesconnell commented on code in PR #8044:
URL: https://github.com/apache/hbase/pull/8044#discussion_r3054364562


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java:
##########
@@ -4483,13 +4505,17 @@ public void onConfigurationChange(Configuration 
newConf) {
     }
     // append the quotas observer back to the master coprocessor key
     setQuotasObserver(newConf);
-    // update region server coprocessor if the configuration has changed.
-    if (
-      CoprocessorConfigurationUtil.checkConfigurationChange(this.cpHost, 
newConf,
-        CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) && !maintenanceMode
-    ) {
-      LOG.info("Update the master coprocessor(s) because the configuration has 
changed");
-      this.cpHost = new MasterCoprocessorHost(this, newConf);
+
+    boolean originalIsReadOnlyEnabled = this.isGlobalReadOnlyEnabled;
+
+    CoprocessorConfigurationUtil.maybeUpdateCoprocessors(newConf, 
this.isGlobalReadOnlyEnabled,
+      this.cpHost, CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 
this.maintenanceMode,
+      this.toString(), val -> this.isGlobalReadOnlyEnabled = val,
+      conf -> initializeCoprocessorHost(newConf));

Review Comment:
   Use the captured `conf`



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AbstractReadOnlyController.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Set;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ActiveClusterSuffix;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected](HBaseInterfaceAudience.CONFIG)
+public abstract class AbstractReadOnlyController implements Coprocessor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractReadOnlyController.class);
+
+  private static final Set<TableName> writableTables =
+    Set.of(TableName.META_TABLE_NAME, MasterRegionFactory.TABLE_NAME);
+
+  public static boolean
+    isWritableInReadOnlyMode(final ObserverContext<? extends 
RegionCoprocessorEnvironment> c) {
+    return 
writableTables.contains(c.getEnvironment().getRegionInfo().getTable());
+  }
+
+  public static boolean isWritableInReadOnlyMode(final TableName tableName) {
+    return writableTables.contains(tableName);
+  }
+
+  protected void internalReadOnlyGuard() throws DoNotRetryIOException {
+    throw new DoNotRetryIOException("Operation not allowed in Read-Only Mode");

Review Comment:
   I think it would be nice to subclass DoNotRetryIOException with something 
more specific to this situation, like `WriteAttemptedOnReadOnlyClusterException`



##########
hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java:
##########
@@ -64,10 +70,42 @@ public final class TableName implements 
Comparable<TableName> {
   // with NAMESPACE_DELIM as delimiter
   public static final String VALID_USER_TABLE_REGEX = "(?:(?:(?:" + 
VALID_NAMESPACE_REGEX + "\\"
     + NAMESPACE_DELIM + ")?)" + "(?:" + VALID_TABLE_QUALIFIER_REGEX + "))";
+  public static final String VALID_META_TABLE_SUFFIX_REGEX = "[a-zA-Z0-9]+";
 
-  /** The hbase:meta table's name. */
-  public static final TableName META_TABLE_NAME =
-    valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "meta");
+  /**
+   * The name of hbase meta table could either be hbase:meta_xxx or 
'hbase:meta' otherwise. Config
+   * hbase.meta.table.suffix will govern the decision of adding suffix to the 
habase:meta
+   */
+  public static final TableName META_TABLE_NAME;
+  static {
+    Configuration conf = HBaseConfiguration.create();

Review Comment:
   When possible I try to avoid using `HBaseConfiguration.create()` because 
it's so inflexible about what config files/values it will read. How necessary 
is it to load the meta table name at static initialization time?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java:
##########
@@ -4483,13 +4505,17 @@ public void onConfigurationChange(Configuration 
newConf) {
     }
     // append the quotas observer back to the master coprocessor key
     setQuotasObserver(newConf);
-    // update region server coprocessor if the configuration has changed.
-    if (
-      CoprocessorConfigurationUtil.checkConfigurationChange(this.cpHost, 
newConf,
-        CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) && !maintenanceMode
-    ) {
-      LOG.info("Update the master coprocessor(s) because the configuration has 
changed");

Review Comment:
   Might be nice to keep this logging?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/util/CoprocessorConfigurationUtil.java:
##########
@@ -102,4 +116,129 @@ private static boolean 
hasCoprocessorsConfigured(Configuration conf, String... c
     }
     return false;
   }
+
+  private static List<String> getCoprocessorsFromConfig(Configuration conf,
+    String configurationKey) {
+    String[] existing = conf.getStrings(configurationKey);
+    return existing != null ? new ArrayList<>(Arrays.asList(existing)) : new 
ArrayList<>();
+  }
+
+  public static void addCoprocessors(Configuration conf, String 
configurationKey,
+    List<String> coprocessorsToAdd) {
+    List<String> existing = getCoprocessorsFromConfig(conf, configurationKey);
+
+    boolean isModified = false;
+
+    for (String coprocessor : coprocessorsToAdd) {
+      if (!existing.contains(coprocessor)) {
+        existing.add(coprocessor);
+        isModified = true;
+      }
+    }
+
+    if (isModified) {
+      conf.setStrings(configurationKey, existing.toArray(new String[0]));
+    }
+  }
+
+  public static void removeCoprocessors(Configuration conf, String 
configurationKey,
+    List<String> coprocessorsToRemove) {
+    List<String> existing = getCoprocessorsFromConfig(conf, configurationKey);
+
+    if (existing.isEmpty()) {
+      return;
+    }
+
+    boolean isModified = false;
+
+    for (String coprocessor : coprocessorsToRemove) {
+      if (existing.contains(coprocessor)) {
+        existing.remove(coprocessor);
+        isModified = true;
+      }
+    }
+
+    if (isModified) {
+      conf.setStrings(configurationKey, existing.toArray(new String[0]));
+    }
+  }
+
+  private static List<String> getReadOnlyCoprocessors(String configurationKey) 
{
+    return switch (configurationKey) {
+      case CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY -> List
+        .of(MasterReadOnlyController.class.getName());
+
+      case CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY -> List
+        .of(RegionServerReadOnlyController.class.getName());
+
+      case CoprocessorHost.REGION_COPROCESSOR_CONF_KEY -> List.of(
+        RegionReadOnlyController.class.getName(), 
BulkLoadReadOnlyController.class.getName(),
+        EndpointReadOnlyController.class.getName());
+
+      default -> throw new IllegalArgumentException(
+        "Unsupported coprocessor configuration key: " + configurationKey);
+    };
+  }
+
+  /**
+   * This method adds or removes relevant ReadOnlyController coprocessors to 
the provided
+   * configuration based on whether read-only mode is enabled.
+   * @param conf               The up-to-date configuration used to determine 
how to handle
+   *                           coprocessors
+   * @param coprocessorConfKey The configuration key name
+   */
+  public static void syncReadOnlyConfigurations(Configuration conf, String 
coprocessorConfKey) {
+    boolean isReadOnlyModeEnabled = 
conf.getBoolean(HConstants.HBASE_GLOBAL_READONLY_ENABLED_KEY,
+      HConstants.HBASE_GLOBAL_READONLY_ENABLED_DEFAULT);
+
+    List<String> cpList = getReadOnlyCoprocessors(coprocessorConfKey);
+    if (isReadOnlyModeEnabled) {
+      CoprocessorConfigurationUtil.addCoprocessors(conf, coprocessorConfKey, 
cpList);
+    } else {
+      CoprocessorConfigurationUtil.removeCoprocessors(conf, 
coprocessorConfKey, cpList);
+    }
+  }
+
+  /**
+   * This method updates the coprocessors on the master, region server, or 
region if a change has
+   * been detected. Detected changes include changes in coprocessors or 
changes in read-only mode
+   * configuration. If a change is detected, then new coprocessors are loaded 
using the provided
+   * reload method. The new value for the read-only config variable is updated 
as well.
+   * @param newConf                   an updated configuration
+   * @param originalIsReadOnlyEnabled the original value for
+   *                                  {@value 
HConstants#HBASE_GLOBAL_READONLY_ENABLED_KEY}
+   * @param coprocessorHost           the coprocessor host for HMaster, 
HRegionServer, or HRegion
+   * @param coprocessorConfKey        configuration key used for setting 
master, region server, or
+   *                                  region coprocessors
+   * @param isMaintenanceMode         whether maintenance mode is active 
(mainly for HMaster)
+   * @param instance                  string value of the instance calling 
this method (mainly helps
+   *                                  with tracking region logging)
+   * @param stateSetter               lambda function that sets the read-only 
instance variable with
+   *                                  an updated value from the config
+   * @param reloadTask                lambda function that reloads 
coprocessors on the master,
+   *                                  region server, or region
+   */
+  public static void maybeUpdateCoprocessors(Configuration newConf,
+    boolean originalIsReadOnlyEnabled, CoprocessorHost<?, ?> coprocessorHost,
+    String coprocessorConfKey, boolean isMaintenanceMode, String instance,
+    Consumer<Boolean> stateSetter, CoprocessorReloadTask reloadTask) {
+
+    boolean maybeUpdatedReadOnlyMode = 
ConfigurationUtil.isReadOnlyModeEnabled(newConf);
+    boolean hasReadOnlyModeChanged = originalIsReadOnlyEnabled != 
maybeUpdatedReadOnlyMode;

Review Comment:
   I think that if this method/class must know whether the read-only mode has 
changed, it should track that itself, and depend on the caller to help track 
it. Maybe you could look at what coprocessors are currently loaded to find out 
whether read-only mode is enabled without explicitly tracking it.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RefreshHFilesCallable.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+
+/**
+ * This is a RegionServer-side callable used by the HBase Master Procedure 
framework to perform the
+ * actual HFiles refresh operation on a specific region. It is dispatched from 
the
+ * {@link 
org.apache.hadoop.hbase.master.procedure.RefreshHFilesRegionProcedure} to the 
RegionServer
+ * and executes the logic to refresh store files in each store of the region.
+ */
+
[email protected]
+public class RefreshHFilesCallable extends BaseRSProcedureCallable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RefreshHFilesCallable.class);
+
+  private RegionInfo regionInfo;
+
+  @Override
+  protected byte[] doCall() throws Exception {
+    HRegion region = rs.getRegion(regionInfo.getEncodedName());
+    LOG.debug("Starting refreshHfiles operation on region {}", region);
+
+    try {
+      for (Store store : region.getStores()) {
+        store.refreshStoreFiles();
+      }
+    } catch (IOException ioe) {
+      LOG.warn("Exception while trying to refresh store files: ", ioe);
+    }

Review Comment:
   Could you talk about the decision to swallow the error here? I'm on the 
fence if that is the right choice.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java:
##########
@@ -130,6 +132,39 @@ public Set<String> getCoprocessorClassNames() {
     return returnValue;
   }
 
+  /**
+   * Used to help make the relevant loaded coprocessors dynamically 
configurable by registering them
+   * to the {@link ConfigurationManager}. Coprocessors are considered 
"relevant" if they implement
+   * the {@link ConfigurationObserver} interface.
+   * @param configurationManager the ConfigurationManager the coprocessors get 
registered to
+   */
+  public void registerConfigurationObservers(ConfigurationManager 
configurationManager) {
+    Coprocessor foundCp;
+    Set<String> coprocessors = this.getCoprocessors();
+    for (String cp : coprocessors) {
+      foundCp = this.findCoprocessor(cp);
+      if (foundCp instanceof ConfigurationObserver) {
+        configurationManager.registerObserver((ConfigurationObserver) foundCp);
+      }
+    }
+  }
+
+  /**
+   * Deregisters relevant coprocessors from the {@link ConfigurationManager}. 
Coprocessors are
+   * considered "relevant" if they implement the {@link ConfigurationObserver} 
interface.
+   * @param configurationManager the ConfigurationManager the coprocessors get 
deregistered from
+   */
+  public void deregisterConfigurationObservers(ConfigurationManager 
configurationManager) {
+    Coprocessor foundCp;
+    Set<String> coprocessors = this.getCoprocessors();
+    for (String cp : coprocessors) {
+      foundCp = this.findCoprocessor(cp);
+      if (foundCp instanceof ConfigurationObserver) {
+        configurationManager.deregisterObserver((ConfigurationObserver) 
foundCp);
+      }
+    }

Review Comment:
   It looks like none of your coprocessors implement ConfigurationObserver. Was 
this meant as speculative infrastructure, or left in by accident?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RefreshMetaProcedure.java:
##########
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import static 
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState.WAITING_TIMEOUT;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshMetaState;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshMetaStateData;
+
[email protected]
+public class RefreshMetaProcedure extends 
AbstractStateMachineTableProcedure<RefreshMetaState> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RefreshMetaProcedure.class);
+  private static final String HIDDEN_DIR_PATTERN = "^[._-].*";
+
+  private List<RegionInfo> currentRegions;
+  private List<RegionInfo> latestRegions;
+  private List<Mutation> pendingMutations;
+  private RetryCounter retryCounter;
+  private static final int MUTATION_BATCH_SIZE = 100;
+  private List<RegionInfo> newlyAddedRegions;
+  private List<TableName> deletedTables;
+
+  public RefreshMetaProcedure() {
+    super();
+  }
+
+  public RefreshMetaProcedure(MasterProcedureEnv env) {
+    super(env);
+  }
+
+  @Override
+  public TableName getTableName() {
+    return TableName.META_TABLE_NAME;
+  }
+
+  @Override
+  public TableOperationType getTableOperationType() {
+    return TableOperationType.EDIT;
+  }
+
+  @Override
+  protected Flow executeFromState(MasterProcedureEnv env, RefreshMetaState 
refreshMetaState) {
+    LOG.info("Executing RefreshMetaProcedure state: {}", refreshMetaState);
+
+    try {
+      return switch (refreshMetaState) {
+        case REFRESH_META_INIT -> executeInit(env);
+        case REFRESH_META_SCAN_STORAGE -> executeScanStorage(env);
+        case REFRESH_META_PREPARE -> executePrepare();
+        case REFRESH_META_APPLY -> executeApply(env);
+        case REFRESH_META_FOLLOWUP -> executeFollowup(env);
+        case REFRESH_META_FINISH -> executeFinish(env);
+        default -> throw new UnsupportedOperationException("Unhandled state: " 
+ refreshMetaState);
+      };
+    } catch (Exception ex) {
+      LOG.error("Error in RefreshMetaProcedure state {}", refreshMetaState, 
ex);
+      setFailure("RefreshMetaProcedure", ex);
+      return Flow.NO_MORE_STATE;
+    }
+  }
+
+  private Flow executeInit(MasterProcedureEnv env) throws IOException {
+    LOG.trace("Getting current regions from {} table", 
TableName.META_TABLE_NAME);
+    try {
+      currentRegions = 
getCurrentRegions(env.getMasterServices().getConnection());
+      LOG.info("Found {} current regions in meta table", 
currentRegions.size());
+      setNextState(RefreshMetaState.REFRESH_META_SCAN_STORAGE);
+      return Flow.HAS_MORE_STATE;
+    } catch (IOException ioe) {
+      LOG.error("Failed to get current regions from meta table", ioe);
+      throw ioe;
+    }
+  }
+
+  private Flow executeScanStorage(MasterProcedureEnv env) throws IOException {
+    try {
+      latestRegions = 
scanBackingStorage(env.getMasterServices().getConnection());
+      LOG.info("Found {} regions in backing storage", latestRegions.size());
+      setNextState(RefreshMetaState.REFRESH_META_PREPARE);
+      return Flow.HAS_MORE_STATE;
+    } catch (IOException ioe) {
+      LOG.error("Failed to scan backing storage", ioe);
+      throw ioe;
+    }
+  }
+
+  private Flow executePrepare() throws IOException {
+    if (currentRegions == null || latestRegions == null) {
+      LOG.error(
+        "Can not execute update on null lists. " + "Meta Table Regions - {}, 
Storage Regions - {}",
+        currentRegions, latestRegions);
+      throw new IOException(
+        (currentRegions == null ? "current regions" : "latest regions") + " 
list is null");
+    }
+    LOG.info("Comparing regions. Current regions: {}, Latest regions: {}", 
currentRegions.size(),
+      latestRegions.size());
+
+    this.newlyAddedRegions = new ArrayList<>();
+    this.deletedTables = new ArrayList<>();
+
+    pendingMutations = prepareMutations(
+      currentRegions.stream()
+        .collect(Collectors.toMap(RegionInfo::getEncodedName, 
Function.identity())),
+      latestRegions.stream()
+        .collect(Collectors.toMap(RegionInfo::getEncodedName, 
Function.identity())));
+
+    if (pendingMutations.isEmpty()) {
+      LOG.info("RefreshMetaProcedure completed, No update needed.");
+      setNextState(RefreshMetaState.REFRESH_META_FINISH);
+    } else {
+      LOG.info("Prepared {} region mutations and {} tables for cleanup.", 
pendingMutations.size(),
+        deletedTables.size());
+      setNextState(RefreshMetaState.REFRESH_META_APPLY);
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  private Flow executeApply(MasterProcedureEnv env) throws 
ProcedureSuspendedException {
+    try {
+      if (pendingMutations != null && !pendingMutations.isEmpty()) {
+        applyMutations(env.getMasterServices().getConnection(), 
pendingMutations);
+        LOG.debug("RefreshMetaProcedure applied {} mutations to meta table",
+          pendingMutations.size());
+      }
+    } catch (IOException ioe) {
+      if (retryCounter == null) {
+        retryCounter = 
ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+      }
+      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+      LOG.warn("Failed to apply mutations to meta table, suspending for {} 
ms", backoff, ioe);
+      setTimeout(Math.toIntExact(backoff));
+      setState(WAITING_TIMEOUT);
+      skipPersistence();
+      throw new ProcedureSuspendedException();
+    }
+
+    if (
+      (this.newlyAddedRegions != null && !this.newlyAddedRegions.isEmpty())
+        || (this.deletedTables != null && !this.deletedTables.isEmpty())
+    ) {
+      setNextState(RefreshMetaState.REFRESH_META_FOLLOWUP);
+    } else {
+      LOG.info("RefreshMetaProcedure completed. No follow-up actions were 
required.");
+      setNextState(RefreshMetaState.REFRESH_META_FINISH);
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  private Flow executeFollowup(MasterProcedureEnv env) throws IOException {
+
+    LOG.info("Submitting assignment for new regions: {}", 
this.newlyAddedRegions);
+    
addChildProcedure(env.getAssignmentManager().createAssignProcedures(newlyAddedRegions));
+
+    for (TableName tableName : this.deletedTables) {
+      LOG.debug("Submitting deletion for empty table {}", tableName);
+      env.getMasterServices().getAssignmentManager().deleteTable(tableName);
+      
env.getMasterServices().getTableStateManager().setDeletedTable(tableName);
+      env.getMasterServices().getTableDescriptors().remove(tableName);
+    }
+    setNextState(RefreshMetaState.REFRESH_META_FINISH);
+    return Flow.HAS_MORE_STATE;
+  }
+
+  private Flow executeFinish(MasterProcedureEnv env) {
+    invalidateTableDescriptorCache(env);
+    LOG.info("RefreshMetaProcedure completed successfully. All follow-up 
actions finished.");
+    currentRegions = null;
+    latestRegions = null;
+    pendingMutations = null;
+    deletedTables = null;
+    newlyAddedRegions = null;
+    return Flow.NO_MORE_STATE;
+  }
+
+  private void invalidateTableDescriptorCache(MasterProcedureEnv env) {
+    LOG.debug("Invalidating the table descriptor cache to ensure new tables 
are discovered");
+    
env.getMasterServices().getTableDescriptors().invalidateTableDescriptorCache();
+  }
+
+  /**
+   * Prepares mutations by comparing the current regions in hbase:meta with 
the latest regions from
+   * backing storage. Also populates newlyAddedRegions and deletedTables lists 
for follow-up
+   * actions.
+   * @param currentMap Current regions from hbase:meta
+   * @param latestMap  Latest regions from backing storage
+   * @return List of mutations to apply to the meta table
+   * @throws IOException If there is an error creating mutations
+   */
+  private List<Mutation> prepareMutations(Map<String, RegionInfo> currentMap,
+    Map<String, RegionInfo> latestMap) throws IOException {
+    List<Mutation> mutations = new ArrayList<>();
+
+    for (String regionId : Stream.concat(currentMap.keySet().stream(), 
latestMap.keySet().stream())
+      .collect(Collectors.toSet())) {
+      RegionInfo currentRegion = currentMap.get(regionId);
+      RegionInfo latestRegion = latestMap.get(regionId);
+
+      if (latestRegion != null) {
+        if (currentRegion == null || hasBoundaryChanged(currentRegion, 
latestRegion)) {
+          mutations.add(MetaTableAccessor.makePutFromRegionInfo(latestRegion));
+          newlyAddedRegions.add(latestRegion);
+        }
+      } else {
+        mutations.add(MetaTableAccessor.makeDeleteFromRegionInfo(currentRegion,
+          EnvironmentEdgeManager.currentTime()));
+      }
+    }
+
+    if (!currentMap.isEmpty() || !latestMap.isEmpty()) {
+      Set<TableName> currentTables =
+        
currentMap.values().stream().map(RegionInfo::getTable).collect(Collectors.toSet());
+      Set<TableName> latestTables =
+        
latestMap.values().stream().map(RegionInfo::getTable).collect(Collectors.toSet());
+
+      Set<TableName> tablesToDeleteState = new HashSet<>(currentTables);
+      tablesToDeleteState.removeAll(latestTables);
+      if (!tablesToDeleteState.isEmpty()) {
+        LOG.warn(
+          "The following tables have no regions on storage and WILL BE REMOVED 
from the meta: {}",
+          tablesToDeleteState);
+        this.deletedTables.addAll(tablesToDeleteState);
+      }
+
+      Set<TableName> tablesToRestoreState = new HashSet<>(latestTables);
+      tablesToRestoreState.removeAll(currentTables);
+      if (!tablesToRestoreState.isEmpty()) {
+        LOG.info("Adding missing table:state entry for recovered tables: {}", 
tablesToRestoreState);
+        for (TableName tableName : tablesToRestoreState) {
+          TableState tableState = new TableState(tableName, 
TableState.State.ENABLED);
+          mutations.add(MetaTableAccessor.makePutFromTableState(tableState,
+            EnvironmentEdgeManager.currentTime()));
+        }
+      }
+    }
+    return mutations;
+  }
+
+  private void applyMutations(Connection connection, List<Mutation> mutations) 
throws IOException {
+    List<List<Mutation>> chunks = Lists.partition(mutations, 
MUTATION_BATCH_SIZE);
+
+    for (int i = 0; i < chunks.size(); i++) {
+      List<Mutation> chunk = chunks.get(i);
+
+      List<Put> puts =
+        chunk.stream().filter(m -> m instanceof Put).map(m -> (Put) 
m).collect(Collectors.toList());
+
+      List<Delete> deletes = chunk.stream().filter(m -> m instanceof 
Delete).map(m -> (Delete) m)
+        .collect(Collectors.toList());
+
+      if (!puts.isEmpty()) {
+        MetaTableAccessor.putsToMetaTable(connection, puts);
+      }
+      if (!deletes.isEmpty()) {
+        MetaTableAccessor.deleteFromMetaTable(connection, deletes);
+      }
+      LOG.debug("Successfully processed batch {}/{}", i + 1, chunks.size());
+    }
+  }
+
+  boolean hasBoundaryChanged(RegionInfo region1, RegionInfo region2) {
+    return !Arrays.equals(region1.getStartKey(), region2.getStartKey())
+      || !Arrays.equals(region1.getEndKey(), region2.getEndKey());
+  }
+
+  /**
+   * Scans the backing storage for all regions and returns a list of 
RegionInfo objects. This method
+   * scans the filesystem for region directories and reads their .regioninfo 
files.
+   * @param connection The HBase connection to use.
+   * @return List of RegionInfo objects found in the backing storage.
+   * @throws IOException If there is an error accessing the filesystem or 
reading region info files.
+   */
+  List<RegionInfo> scanBackingStorage(Connection connection) throws 
IOException {
+    List<RegionInfo> regions = new ArrayList<>();
+    Configuration conf = connection.getConfiguration();
+    FileSystem fs = FileSystem.get(conf);
+    Path rootDir = CommonFSUtils.getRootDir(conf);
+    Path dataDir = new Path(rootDir, HConstants.BASE_NAMESPACE_DIR);
+
+    LOG.info("Scanning backing storage under: {}", dataDir);
+
+    if (!fs.exists(dataDir)) {
+      LOG.warn("Data directory does not exist: {}", dataDir);
+      return regions;
+    }
+
+    FileStatus[] namespaceDirs =
+      fs.listStatus(dataDir, path -> 
!path.getName().matches(HIDDEN_DIR_PATTERN));
+    LOG.debug("Found {} namespace directories in data dir", 
Arrays.stream(namespaceDirs).toList());
+
+    for (FileStatus nsDir : namespaceDirs) {
+      String namespaceName = nsDir.getPath().getName();
+      if (NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR.equals(namespaceName)) 
{
+        LOG.info("Skipping system namespace {}", namespaceName);
+        continue;
+      }
+      try {
+        List<RegionInfo> namespaceRegions = scanTablesInNamespace(fs, 
nsDir.getPath());
+        regions.addAll(namespaceRegions);
+        LOG.debug("Found {} regions in namespace {}", namespaceRegions.size(),
+          nsDir.getPath().getName());
+      } catch (IOException e) {
+        LOG.error("Failed to scan namespace directory: {}", nsDir.getPath(), 
e);
+      }
+    }
+    LOG.info("Scanned backing storage and found {} regions", regions.size());
+    return regions;
+  }
+
+  private List<RegionInfo> scanTablesInNamespace(FileSystem fs, Path 
namespacePath)
+    throws IOException {
+    LOG.debug("Scanning namespace {}", namespacePath.getName());
+    List<Path> tableDirs = FSUtils.getLocalTableDirs(fs, namespacePath);
+
+    return tableDirs.parallelStream().flatMap(tableDir -> {

Review Comment:
   Delegating into the common ForkJoinPool feels dicey here. I would feel safer 
if this was a regular stream().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to