charlesconnell commented on code in PR #8044:
URL: https://github.com/apache/hbase/pull/8044#discussion_r3054364562
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java:
##########
@@ -4483,13 +4505,17 @@ public void onConfigurationChange(Configuration
newConf) {
}
// append the quotas observer back to the master coprocessor key
setQuotasObserver(newConf);
- // update region server coprocessor if the configuration has changed.
- if (
- CoprocessorConfigurationUtil.checkConfigurationChange(this.cpHost,
newConf,
- CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) && !maintenanceMode
- ) {
- LOG.info("Update the master coprocessor(s) because the configuration has
changed");
- this.cpHost = new MasterCoprocessorHost(this, newConf);
+
+ boolean originalIsReadOnlyEnabled = this.isGlobalReadOnlyEnabled;
+
+ CoprocessorConfigurationUtil.maybeUpdateCoprocessors(newConf,
this.isGlobalReadOnlyEnabled,
+ this.cpHost, CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
this.maintenanceMode,
+ this.toString(), val -> this.isGlobalReadOnlyEnabled = val,
+ conf -> initializeCoprocessorHost(newConf));
Review Comment:
Use the captured `conf`
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AbstractReadOnlyController.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.access;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Set;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ActiveClusterSuffix;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected](HBaseInterfaceAudience.CONFIG)
+public abstract class AbstractReadOnlyController implements Coprocessor {
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractReadOnlyController.class);
+
+ private static final Set<TableName> writableTables =
+ Set.of(TableName.META_TABLE_NAME, MasterRegionFactory.TABLE_NAME);
+
+ public static boolean
+ isWritableInReadOnlyMode(final ObserverContext<? extends
RegionCoprocessorEnvironment> c) {
+ return
writableTables.contains(c.getEnvironment().getRegionInfo().getTable());
+ }
+
+ public static boolean isWritableInReadOnlyMode(final TableName tableName) {
+ return writableTables.contains(tableName);
+ }
+
+ protected void internalReadOnlyGuard() throws DoNotRetryIOException {
+ throw new DoNotRetryIOException("Operation not allowed in Read-Only Mode");
Review Comment:
I think it would be nice to subclass DoNotRetryIOException with something
more specific to this situation, like `WriteAttemptedOnReadOnlyClusterException`
##########
hbase-common/src/main/java/org/apache/hadoop/hbase/TableName.java:
##########
@@ -64,10 +70,42 @@ public final class TableName implements
Comparable<TableName> {
// with NAMESPACE_DELIM as delimiter
public static final String VALID_USER_TABLE_REGEX = "(?:(?:(?:" +
VALID_NAMESPACE_REGEX + "\\"
+ NAMESPACE_DELIM + ")?)" + "(?:" + VALID_TABLE_QUALIFIER_REGEX + "))";
+ public static final String VALID_META_TABLE_SUFFIX_REGEX = "[a-zA-Z0-9]+";
- /** The hbase:meta table's name. */
- public static final TableName META_TABLE_NAME =
- valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "meta");
+ /**
+ * The name of hbase meta table could either be hbase:meta_xxx or
'hbase:meta' otherwise. Config
+ * hbase.meta.table.suffix will govern the decision of adding suffix to the
habase:meta
+ */
+ public static final TableName META_TABLE_NAME;
+ static {
+ Configuration conf = HBaseConfiguration.create();
Review Comment:
When possible I try to avoid using `HBaseConfiguration.create()` because
it's so inflexible about what config files/values it will read. How necessary
is it to load the meta table name at static initialization time?
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java:
##########
@@ -4483,13 +4505,17 @@ public void onConfigurationChange(Configuration
newConf) {
}
// append the quotas observer back to the master coprocessor key
setQuotasObserver(newConf);
- // update region server coprocessor if the configuration has changed.
- if (
- CoprocessorConfigurationUtil.checkConfigurationChange(this.cpHost,
newConf,
- CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY) && !maintenanceMode
- ) {
- LOG.info("Update the master coprocessor(s) because the configuration has
changed");
Review Comment:
Might be nice to keep this logging?
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/util/CoprocessorConfigurationUtil.java:
##########
@@ -102,4 +116,129 @@ private static boolean
hasCoprocessorsConfigured(Configuration conf, String... c
}
return false;
}
+
+ private static List<String> getCoprocessorsFromConfig(Configuration conf,
+ String configurationKey) {
+ String[] existing = conf.getStrings(configurationKey);
+ return existing != null ? new ArrayList<>(Arrays.asList(existing)) : new
ArrayList<>();
+ }
+
+ public static void addCoprocessors(Configuration conf, String
configurationKey,
+ List<String> coprocessorsToAdd) {
+ List<String> existing = getCoprocessorsFromConfig(conf, configurationKey);
+
+ boolean isModified = false;
+
+ for (String coprocessor : coprocessorsToAdd) {
+ if (!existing.contains(coprocessor)) {
+ existing.add(coprocessor);
+ isModified = true;
+ }
+ }
+
+ if (isModified) {
+ conf.setStrings(configurationKey, existing.toArray(new String[0]));
+ }
+ }
+
+ public static void removeCoprocessors(Configuration conf, String
configurationKey,
+ List<String> coprocessorsToRemove) {
+ List<String> existing = getCoprocessorsFromConfig(conf, configurationKey);
+
+ if (existing.isEmpty()) {
+ return;
+ }
+
+ boolean isModified = false;
+
+ for (String coprocessor : coprocessorsToRemove) {
+ if (existing.contains(coprocessor)) {
+ existing.remove(coprocessor);
+ isModified = true;
+ }
+ }
+
+ if (isModified) {
+ conf.setStrings(configurationKey, existing.toArray(new String[0]));
+ }
+ }
+
+ private static List<String> getReadOnlyCoprocessors(String configurationKey)
{
+ return switch (configurationKey) {
+ case CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY -> List
+ .of(MasterReadOnlyController.class.getName());
+
+ case CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY -> List
+ .of(RegionServerReadOnlyController.class.getName());
+
+ case CoprocessorHost.REGION_COPROCESSOR_CONF_KEY -> List.of(
+ RegionReadOnlyController.class.getName(),
BulkLoadReadOnlyController.class.getName(),
+ EndpointReadOnlyController.class.getName());
+
+ default -> throw new IllegalArgumentException(
+ "Unsupported coprocessor configuration key: " + configurationKey);
+ };
+ }
+
+ /**
+ * This method adds or removes relevant ReadOnlyController coprocessors to
the provided
+ * configuration based on whether read-only mode is enabled.
+ * @param conf The up-to-date configuration used to determine
how to handle
+ * coprocessors
+ * @param coprocessorConfKey The configuration key name
+ */
+ public static void syncReadOnlyConfigurations(Configuration conf, String
coprocessorConfKey) {
+ boolean isReadOnlyModeEnabled =
conf.getBoolean(HConstants.HBASE_GLOBAL_READONLY_ENABLED_KEY,
+ HConstants.HBASE_GLOBAL_READONLY_ENABLED_DEFAULT);
+
+ List<String> cpList = getReadOnlyCoprocessors(coprocessorConfKey);
+ if (isReadOnlyModeEnabled) {
+ CoprocessorConfigurationUtil.addCoprocessors(conf, coprocessorConfKey,
cpList);
+ } else {
+ CoprocessorConfigurationUtil.removeCoprocessors(conf,
coprocessorConfKey, cpList);
+ }
+ }
+
+ /**
+ * This method updates the coprocessors on the master, region server, or
region if a change has
+ * been detected. Detected changes include changes in coprocessors or
changes in read-only mode
+ * configuration. If a change is detected, then new coprocessors are loaded
using the provided
+ * reload method. The new value for the read-only config variable is updated
as well.
+ * @param newConf an updated configuration
+ * @param originalIsReadOnlyEnabled the original value for
+ * {@value
HConstants#HBASE_GLOBAL_READONLY_ENABLED_KEY}
+ * @param coprocessorHost the coprocessor host for HMaster,
HRegionServer, or HRegion
+ * @param coprocessorConfKey configuration key used for setting
master, region server, or
+ * region coprocessors
+ * @param isMaintenanceMode whether maintenance mode is active
(mainly for HMaster)
+ * @param instance string value of the instance calling
this method (mainly helps
+ * with tracking region logging)
+ * @param stateSetter lambda function that sets the read-only
instance variable with
+ * an updated value from the config
+ * @param reloadTask lambda function that reloads
coprocessors on the master,
+ * region server, or region
+ */
+ public static void maybeUpdateCoprocessors(Configuration newConf,
+ boolean originalIsReadOnlyEnabled, CoprocessorHost<?, ?> coprocessorHost,
+ String coprocessorConfKey, boolean isMaintenanceMode, String instance,
+ Consumer<Boolean> stateSetter, CoprocessorReloadTask reloadTask) {
+
+ boolean maybeUpdatedReadOnlyMode =
ConfigurationUtil.isReadOnlyModeEnabled(newConf);
+ boolean hasReadOnlyModeChanged = originalIsReadOnlyEnabled !=
maybeUpdatedReadOnlyMode;
Review Comment:
I think that if this method/class must know whether the read-only mode has
changed, it should track that itself, and depend on the caller to help track
it. Maybe you could look at what coprocessors are currently loaded to find out
whether read-only mode is enabled without explicitly tracking it.
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RefreshHFilesCallable.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+
+/**
+ * This is a RegionServer-side callable used by the HBase Master Procedure
framework to perform the
+ * actual HFiles refresh operation on a specific region. It is dispatched from
the
+ * {@link
org.apache.hadoop.hbase.master.procedure.RefreshHFilesRegionProcedure} to the
RegionServer
+ * and executes the logic to refresh store files in each store of the region.
+ */
+
[email protected]
+public class RefreshHFilesCallable extends BaseRSProcedureCallable {
+ private static final Logger LOG =
LoggerFactory.getLogger(RefreshHFilesCallable.class);
+
+ private RegionInfo regionInfo;
+
+ @Override
+ protected byte[] doCall() throws Exception {
+ HRegion region = rs.getRegion(regionInfo.getEncodedName());
+ LOG.debug("Starting refreshHfiles operation on region {}", region);
+
+ try {
+ for (Store store : region.getStores()) {
+ store.refreshStoreFiles();
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Exception while trying to refresh store files: ", ioe);
+ }
Review Comment:
Could you talk about the decision to swallow the error here? I'm on the
fence if that is the right choice.
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java:
##########
@@ -130,6 +132,39 @@ public Set<String> getCoprocessorClassNames() {
return returnValue;
}
+ /**
+ * Used to help make the relevant loaded coprocessors dynamically
configurable by registering them
+ * to the {@link ConfigurationManager}. Coprocessors are considered
"relevant" if they implement
+ * the {@link ConfigurationObserver} interface.
+ * @param configurationManager the ConfigurationManager the coprocessors get
registered to
+ */
+ public void registerConfigurationObservers(ConfigurationManager
configurationManager) {
+ Coprocessor foundCp;
+ Set<String> coprocessors = this.getCoprocessors();
+ for (String cp : coprocessors) {
+ foundCp = this.findCoprocessor(cp);
+ if (foundCp instanceof ConfigurationObserver) {
+ configurationManager.registerObserver((ConfigurationObserver) foundCp);
+ }
+ }
+ }
+
+ /**
+ * Deregisters relevant coprocessors from the {@link ConfigurationManager}.
Coprocessors are
+ * considered "relevant" if they implement the {@link ConfigurationObserver}
interface.
+ * @param configurationManager the ConfigurationManager the coprocessors get
deregistered from
+ */
+ public void deregisterConfigurationObservers(ConfigurationManager
configurationManager) {
+ Coprocessor foundCp;
+ Set<String> coprocessors = this.getCoprocessors();
+ for (String cp : coprocessors) {
+ foundCp = this.findCoprocessor(cp);
+ if (foundCp instanceof ConfigurationObserver) {
+ configurationManager.deregisterObserver((ConfigurationObserver)
foundCp);
+ }
+ }
Review Comment:
It looks like none of your coprocessors implement ConfigurationObserver. Was
this meant as speculative infrastructure, or left in by accident?
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RefreshMetaProcedure.java:
##########
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import static
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState.WAITING_TIMEOUT;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshMetaState;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshMetaStateData;
+
[email protected]
+public class RefreshMetaProcedure extends
AbstractStateMachineTableProcedure<RefreshMetaState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RefreshMetaProcedure.class);
+ private static final String HIDDEN_DIR_PATTERN = "^[._-].*";
+
+ private List<RegionInfo> currentRegions;
+ private List<RegionInfo> latestRegions;
+ private List<Mutation> pendingMutations;
+ private RetryCounter retryCounter;
+ private static final int MUTATION_BATCH_SIZE = 100;
+ private List<RegionInfo> newlyAddedRegions;
+ private List<TableName> deletedTables;
+
+ public RefreshMetaProcedure() {
+ super();
+ }
+
+ public RefreshMetaProcedure(MasterProcedureEnv env) {
+ super(env);
+ }
+
+ @Override
+ public TableName getTableName() {
+ return TableName.META_TABLE_NAME;
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.EDIT;
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env, RefreshMetaState
refreshMetaState) {
+ LOG.info("Executing RefreshMetaProcedure state: {}", refreshMetaState);
+
+ try {
+ return switch (refreshMetaState) {
+ case REFRESH_META_INIT -> executeInit(env);
+ case REFRESH_META_SCAN_STORAGE -> executeScanStorage(env);
+ case REFRESH_META_PREPARE -> executePrepare();
+ case REFRESH_META_APPLY -> executeApply(env);
+ case REFRESH_META_FOLLOWUP -> executeFollowup(env);
+ case REFRESH_META_FINISH -> executeFinish(env);
+ default -> throw new UnsupportedOperationException("Unhandled state: "
+ refreshMetaState);
+ };
+ } catch (Exception ex) {
+ LOG.error("Error in RefreshMetaProcedure state {}", refreshMetaState,
ex);
+ setFailure("RefreshMetaProcedure", ex);
+ return Flow.NO_MORE_STATE;
+ }
+ }
+
+ private Flow executeInit(MasterProcedureEnv env) throws IOException {
+ LOG.trace("Getting current regions from {} table",
TableName.META_TABLE_NAME);
+ try {
+ currentRegions =
getCurrentRegions(env.getMasterServices().getConnection());
+ LOG.info("Found {} current regions in meta table",
currentRegions.size());
+ setNextState(RefreshMetaState.REFRESH_META_SCAN_STORAGE);
+ return Flow.HAS_MORE_STATE;
+ } catch (IOException ioe) {
+ LOG.error("Failed to get current regions from meta table", ioe);
+ throw ioe;
+ }
+ }
+
+ private Flow executeScanStorage(MasterProcedureEnv env) throws IOException {
+ try {
+ latestRegions =
scanBackingStorage(env.getMasterServices().getConnection());
+ LOG.info("Found {} regions in backing storage", latestRegions.size());
+ setNextState(RefreshMetaState.REFRESH_META_PREPARE);
+ return Flow.HAS_MORE_STATE;
+ } catch (IOException ioe) {
+ LOG.error("Failed to scan backing storage", ioe);
+ throw ioe;
+ }
+ }
+
+ private Flow executePrepare() throws IOException {
+ if (currentRegions == null || latestRegions == null) {
+ LOG.error(
+ "Can not execute update on null lists. " + "Meta Table Regions - {},
Storage Regions - {}",
+ currentRegions, latestRegions);
+ throw new IOException(
+ (currentRegions == null ? "current regions" : "latest regions") + "
list is null");
+ }
+ LOG.info("Comparing regions. Current regions: {}, Latest regions: {}",
currentRegions.size(),
+ latestRegions.size());
+
+ this.newlyAddedRegions = new ArrayList<>();
+ this.deletedTables = new ArrayList<>();
+
+ pendingMutations = prepareMutations(
+ currentRegions.stream()
+ .collect(Collectors.toMap(RegionInfo::getEncodedName,
Function.identity())),
+ latestRegions.stream()
+ .collect(Collectors.toMap(RegionInfo::getEncodedName,
Function.identity())));
+
+ if (pendingMutations.isEmpty()) {
+ LOG.info("RefreshMetaProcedure completed, No update needed.");
+ setNextState(RefreshMetaState.REFRESH_META_FINISH);
+ } else {
+ LOG.info("Prepared {} region mutations and {} tables for cleanup.",
pendingMutations.size(),
+ deletedTables.size());
+ setNextState(RefreshMetaState.REFRESH_META_APPLY);
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ private Flow executeApply(MasterProcedureEnv env) throws
ProcedureSuspendedException {
+ try {
+ if (pendingMutations != null && !pendingMutations.isEmpty()) {
+ applyMutations(env.getMasterServices().getConnection(),
pendingMutations);
+ LOG.debug("RefreshMetaProcedure applied {} mutations to meta table",
+ pendingMutations.size());
+ }
+ } catch (IOException ioe) {
+ if (retryCounter == null) {
+ retryCounter =
ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+ }
+ long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+ LOG.warn("Failed to apply mutations to meta table, suspending for {}
ms", backoff, ioe);
+ setTimeout(Math.toIntExact(backoff));
+ setState(WAITING_TIMEOUT);
+ skipPersistence();
+ throw new ProcedureSuspendedException();
+ }
+
+ if (
+ (this.newlyAddedRegions != null && !this.newlyAddedRegions.isEmpty())
+ || (this.deletedTables != null && !this.deletedTables.isEmpty())
+ ) {
+ setNextState(RefreshMetaState.REFRESH_META_FOLLOWUP);
+ } else {
+ LOG.info("RefreshMetaProcedure completed. No follow-up actions were
required.");
+ setNextState(RefreshMetaState.REFRESH_META_FINISH);
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ private Flow executeFollowup(MasterProcedureEnv env) throws IOException {
+
+ LOG.info("Submitting assignment for new regions: {}",
this.newlyAddedRegions);
+
addChildProcedure(env.getAssignmentManager().createAssignProcedures(newlyAddedRegions));
+
+ for (TableName tableName : this.deletedTables) {
+ LOG.debug("Submitting deletion for empty table {}", tableName);
+ env.getMasterServices().getAssignmentManager().deleteTable(tableName);
+
env.getMasterServices().getTableStateManager().setDeletedTable(tableName);
+ env.getMasterServices().getTableDescriptors().remove(tableName);
+ }
+ setNextState(RefreshMetaState.REFRESH_META_FINISH);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ private Flow executeFinish(MasterProcedureEnv env) {
+ invalidateTableDescriptorCache(env);
+ LOG.info("RefreshMetaProcedure completed successfully. All follow-up
actions finished.");
+ currentRegions = null;
+ latestRegions = null;
+ pendingMutations = null;
+ deletedTables = null;
+ newlyAddedRegions = null;
+ return Flow.NO_MORE_STATE;
+ }
+
+ private void invalidateTableDescriptorCache(MasterProcedureEnv env) {
+ LOG.debug("Invalidating the table descriptor cache to ensure new tables
are discovered");
+
env.getMasterServices().getTableDescriptors().invalidateTableDescriptorCache();
+ }
+
+ /**
+ * Prepares mutations by comparing the current regions in hbase:meta with
the latest regions from
+ * backing storage. Also populates newlyAddedRegions and deletedTables lists
for follow-up
+ * actions.
+ * @param currentMap Current regions from hbase:meta
+ * @param latestMap Latest regions from backing storage
+ * @return List of mutations to apply to the meta table
+ * @throws IOException If there is an error creating mutations
+ */
+ private List<Mutation> prepareMutations(Map<String, RegionInfo> currentMap,
+ Map<String, RegionInfo> latestMap) throws IOException {
+ List<Mutation> mutations = new ArrayList<>();
+
+ for (String regionId : Stream.concat(currentMap.keySet().stream(),
latestMap.keySet().stream())
+ .collect(Collectors.toSet())) {
+ RegionInfo currentRegion = currentMap.get(regionId);
+ RegionInfo latestRegion = latestMap.get(regionId);
+
+ if (latestRegion != null) {
+ if (currentRegion == null || hasBoundaryChanged(currentRegion,
latestRegion)) {
+ mutations.add(MetaTableAccessor.makePutFromRegionInfo(latestRegion));
+ newlyAddedRegions.add(latestRegion);
+ }
+ } else {
+ mutations.add(MetaTableAccessor.makeDeleteFromRegionInfo(currentRegion,
+ EnvironmentEdgeManager.currentTime()));
+ }
+ }
+
+ if (!currentMap.isEmpty() || !latestMap.isEmpty()) {
+ Set<TableName> currentTables =
+
currentMap.values().stream().map(RegionInfo::getTable).collect(Collectors.toSet());
+ Set<TableName> latestTables =
+
latestMap.values().stream().map(RegionInfo::getTable).collect(Collectors.toSet());
+
+ Set<TableName> tablesToDeleteState = new HashSet<>(currentTables);
+ tablesToDeleteState.removeAll(latestTables);
+ if (!tablesToDeleteState.isEmpty()) {
+ LOG.warn(
+ "The following tables have no regions on storage and WILL BE REMOVED
from the meta: {}",
+ tablesToDeleteState);
+ this.deletedTables.addAll(tablesToDeleteState);
+ }
+
+ Set<TableName> tablesToRestoreState = new HashSet<>(latestTables);
+ tablesToRestoreState.removeAll(currentTables);
+ if (!tablesToRestoreState.isEmpty()) {
+ LOG.info("Adding missing table:state entry for recovered tables: {}",
tablesToRestoreState);
+ for (TableName tableName : tablesToRestoreState) {
+ TableState tableState = new TableState(tableName,
TableState.State.ENABLED);
+ mutations.add(MetaTableAccessor.makePutFromTableState(tableState,
+ EnvironmentEdgeManager.currentTime()));
+ }
+ }
+ }
+ return mutations;
+ }
+
+ private void applyMutations(Connection connection, List<Mutation> mutations)
throws IOException {
+ List<List<Mutation>> chunks = Lists.partition(mutations,
MUTATION_BATCH_SIZE);
+
+ for (int i = 0; i < chunks.size(); i++) {
+ List<Mutation> chunk = chunks.get(i);
+
+ List<Put> puts =
+ chunk.stream().filter(m -> m instanceof Put).map(m -> (Put)
m).collect(Collectors.toList());
+
+ List<Delete> deletes = chunk.stream().filter(m -> m instanceof
Delete).map(m -> (Delete) m)
+ .collect(Collectors.toList());
+
+ if (!puts.isEmpty()) {
+ MetaTableAccessor.putsToMetaTable(connection, puts);
+ }
+ if (!deletes.isEmpty()) {
+ MetaTableAccessor.deleteFromMetaTable(connection, deletes);
+ }
+ LOG.debug("Successfully processed batch {}/{}", i + 1, chunks.size());
+ }
+ }
+
+ boolean hasBoundaryChanged(RegionInfo region1, RegionInfo region2) {
+ return !Arrays.equals(region1.getStartKey(), region2.getStartKey())
+ || !Arrays.equals(region1.getEndKey(), region2.getEndKey());
+ }
+
+ /**
+ * Scans the backing storage for all regions and returns a list of
RegionInfo objects. This method
+ * scans the filesystem for region directories and reads their .regioninfo
files.
+ * @param connection The HBase connection to use.
+ * @return List of RegionInfo objects found in the backing storage.
+ * @throws IOException If there is an error accessing the filesystem or
reading region info files.
+ */
+ List<RegionInfo> scanBackingStorage(Connection connection) throws
IOException {
+ List<RegionInfo> regions = new ArrayList<>();
+ Configuration conf = connection.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ Path rootDir = CommonFSUtils.getRootDir(conf);
+ Path dataDir = new Path(rootDir, HConstants.BASE_NAMESPACE_DIR);
+
+ LOG.info("Scanning backing storage under: {}", dataDir);
+
+ if (!fs.exists(dataDir)) {
+ LOG.warn("Data directory does not exist: {}", dataDir);
+ return regions;
+ }
+
+ FileStatus[] namespaceDirs =
+ fs.listStatus(dataDir, path ->
!path.getName().matches(HIDDEN_DIR_PATTERN));
+ LOG.debug("Found {} namespace directories in data dir",
Arrays.stream(namespaceDirs).toList());
+
+ for (FileStatus nsDir : namespaceDirs) {
+ String namespaceName = nsDir.getPath().getName();
+ if (NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR.equals(namespaceName))
{
+ LOG.info("Skipping system namespace {}", namespaceName);
+ continue;
+ }
+ try {
+ List<RegionInfo> namespaceRegions = scanTablesInNamespace(fs,
nsDir.getPath());
+ regions.addAll(namespaceRegions);
+ LOG.debug("Found {} regions in namespace {}", namespaceRegions.size(),
+ nsDir.getPath().getName());
+ } catch (IOException e) {
+ LOG.error("Failed to scan namespace directory: {}", nsDir.getPath(),
e);
+ }
+ }
+ LOG.info("Scanned backing storage and found {} regions", regions.size());
+ return regions;
+ }
+
+ private List<RegionInfo> scanTablesInNamespace(FileSystem fs, Path
namespacePath)
+ throws IOException {
+ LOG.debug("Scanning namespace {}", namespacePath.getName());
+ List<Path> tableDirs = FSUtils.getLocalTableDirs(fs, namespacePath);
+
+ return tableDirs.parallelStream().flatMap(tableDir -> {
Review Comment:
Delegating into the common ForkJoinPool feels dicey here. I would feel safer
if this was a regular stream().
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]