This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new da508f1dae4 HBASE-26974 Introduce a LogRollProcedure (#5408)
da508f1dae4 is described below
commit da508f1dae4bad4f3aa6074b8db44febebefa60e
Author: Ruanhui <[email protected]>
AuthorDate: Fri Sep 12 23:09:34 2025 +0800
HBASE-26974 Introduce a LogRollProcedure (#5408)
Co-authored-by: huiruan <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
(cherry picked from commit ffed09d96bbaccdc83e1d7df66d640cf10b2f191)
---
.../hbase/backup/impl/FullTableBackupClient.java | 7 +-
.../backup/impl/IncrementalBackupManager.java | 10 +-
.../hadoop/hbase/backup/util/BackupUtils.java | 51 ++++++
.../apache/hadoop/hbase/backup/TestBackupBase.java | 7 +-
.../hadoop/hbase/backup/TestBackupMerge.java | 19 +--
.../java/org/apache/hadoop/hbase/client/Admin.java | 10 ++
.../hadoop/hbase/client/AdminOverAsyncAdmin.java | 5 +
.../org/apache/hadoop/hbase/client/AsyncAdmin.java | 9 ++
.../hadoop/hbase/client/AsyncHBaseAdmin.java | 5 +
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 138 ++++++++++++----
.../hbase/shaded/protobuf/RequestConverter.java | 6 +
.../org/apache/hadoop/hbase/util/FutureUtils.java | 2 +-
.../procedure2/RemoteProcedureDispatcher.java | 2 +-
.../src/main/protobuf/HBase.proto | 4 +
.../src/main/protobuf/server/master/Master.proto | 12 ++
.../protobuf/server/master/MasterProcedure.proto | 18 +++
.../server/master/RegionServerStatus.proto | 1 +
.../src/main/protobuf/server/region/Admin.proto | 1 -
.../apache/hadoop/hbase/executor/EventType.java | 8 +-
.../apache/hadoop/hbase/executor/ExecutorType.java | 3 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 24 ++-
.../hadoop/hbase/master/MasterRpcServices.java | 19 ++-
.../apache/hadoop/hbase/master/MasterServices.java | 6 +
.../apache/hadoop/hbase/master/ServerManager.java | 8 +
.../assignment/RegionRemoteProcedureBase.java | 2 +-
.../assignment/RegionTransitionProcedure.java | 2 +-
.../master/procedure/FlushRegionProcedure.java | 2 +-
.../hbase/master/procedure/LogRollProcedure.java | 178 +++++++++++++++++++++
.../master/procedure/LogRollRemoteProcedure.java | 113 +++++++++++++
.../master/procedure/ServerProcedureInterface.java | 5 +
.../hadoop/hbase/master/procedure/ServerQueue.java | 1 +
.../master/procedure/ServerRemoteProcedure.java | 3 +-
.../master/procedure/SnapshotRegionProcedure.java | 2 +-
.../hbase/procedure2/BaseRSProcedureCallable.java | 7 +-
.../hbase/procedure2/RSProcedureCallable.java | 2 +-
.../hbase/regionserver/FlushRegionCallable.java | 3 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 11 +-
.../hadoop/hbase/regionserver/LogRollCallable.java | 84 ++++++++++
.../hadoop/hbase/regionserver/RSRpcServices.java | 2 +-
.../hbase/regionserver/ReloadQuotasCallable.java | 3 +-
.../RemoteProcedureResultReporter.java | 7 +-
.../hbase/regionserver/SnapshotRegionCallable.java | 3 +-
.../hbase/regionserver/SnapshotVerifyCallable.java | 3 +-
.../hbase/regionserver/SplitWALCallable.java | 3 +-
.../regionserver/handler/RSProcedureHandler.java | 8 +-
.../ClaimReplicationQueueCallable.java | 3 +-
.../regionserver/RefreshPeerCallable.java | 3 +-
.../ReplaySyncReplicationWALCallable.java | 3 +-
.../SwitchRpcThrottleRemoteCallable.java | 3 +-
.../apache/hadoop/hbase/wal/AbstractWALRoller.java | 2 +-
.../hbase/master/MockNoopMasterServices.java | 5 +
.../master/procedure/TestLogRollProcedure.java | 104 ++++++++++++
.../procedure/TestServerRemoteProcedure.java | 3 +-
.../TestRegisterPeerWorkerWhenRestarting.java | 4 +-
.../hbase/rsgroup/VerifyingRSGroupAdmin.java | 5 +
hbase-shell/src/main/ruby/hbase/admin.rb | 6 +
hbase-shell/src/main/ruby/shell.rb | 1 +
.../src/main/ruby/shell/commands/wal_roll_all.rb | 37 +++++
.../hadoop/hbase/thrift2/client/ThriftAdmin.java | 4 +
59 files changed, 901 insertions(+), 101 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
index f21ced9bf2f..2293fd4f814 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -25,7 +25,6 @@ import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CON
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.TableName;
@@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
import org.apache.hadoop.hbase.backup.BackupRequest;
import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
@@ -158,10 +156,7 @@ public class FullTableBackupClient extends
TableBackupClient {
// snapshots for the same reason as the log rolls.
List<BulkLoad> bulkLoadsToDelete =
backupManager.readBulkloadRows(tableList);
- Map<String, String> props = new HashMap<>();
- props.put("backupRoot", backupInfo.getBackupRootDir());
-
admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
- LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
+ BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);
newTimestamps = backupManager.readRegionServerLastLogRollResult();
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
index c92c0747e83..20884edf836 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.backup.impl;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -29,9 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
-import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -84,13 +81,8 @@ public class IncrementalBackupManager extends BackupManager {
}
LOG.info("Execute roll log procedure for incremental backup ...");
- HashMap<String, String> props = new HashMap<>();
- props.put("backupRoot", backupInfo.getBackupRootDir());
+ BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);
- try (Admin admin = conn.getAdmin()) {
-
admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
- LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
- }
newTimestamps = readRegionServerLastLogRollResult();
logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps,
conf, savedStartCode);
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index 15159ed73e4..183cc2054f1 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -65,6 +67,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
@@ -770,4 +773,52 @@ public final class BackupUtils {
return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
}
+ /**
+ * roll WAL writer for all region servers and record the newest log roll
result
+ */
+ public static void logRoll(Connection conn, String backupRootDir,
Configuration conf)
+ throws IOException {
+ boolean legacy = conf.getBoolean("hbase.backup.logroll.legacy.used",
false);
+ if (legacy) {
+ logRollV1(conn, backupRootDir);
+ } else {
+ logRollV2(conn, backupRootDir);
+ }
+ }
+
+ private static void logRollV1(Connection conn, String backupRootDir) throws
IOException {
+ try (Admin admin = conn.getAdmin()) {
+
admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
+ LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME,
+ ImmutableMap.of("backupRoot", backupRootDir));
+ }
+ }
+
+ private static void logRollV2(Connection conn, String backupRootDir) throws
IOException {
+ BackupSystemTable backupSystemTable = new BackupSystemTable(conn);
+ HashMap<String, Long> lastLogRollResult =
+ backupSystemTable.readRegionServerLastLogRollResult(backupRootDir);
+ try (Admin admin = conn.getAdmin()) {
+ Map<ServerName, Long> newLogRollResult = admin.rollAllWALWriters();
+
+ for (Map.Entry<ServerName, Long> entry : newLogRollResult.entrySet()) {
+ ServerName serverName = entry.getKey();
+ long newHighestWALFilenum = entry.getValue();
+
+ String address = serverName.getAddress().toString();
+ Long lastHighestWALFilenum = lastLogRollResult.get(address);
+ if (lastHighestWALFilenum != null && lastHighestWALFilenum >
newHighestWALFilenum) {
+ LOG.warn("Won't update last roll log result for server {}: current =
{}, new = {}",
+ serverName, lastHighestWALFilenum, newHighestWALFilenum);
+ } else {
+ backupSystemTable.writeRegionServerLastLogRollResult(address,
newHighestWALFilenum,
+ backupRootDir);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("updated last roll log result for {} from {} to {}",
serverName,
+ lastHighestWALFilenum, newHighestWALFilenum);
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index b5f58508441..a14fce59faf 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.backup;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -45,7 +44,6 @@ import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient;
import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager;
import org.apache.hadoop.hbase.backup.impl.IncrementalTableBackupClient;
-import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -239,10 +237,7 @@ public class TestBackupBase {
// the snapshot.
LOG.info("Execute roll log procedure for full backup ...");
- Map<String, String> props = new HashMap<>();
- props.put("backupRoot", backupInfo.getBackupRootDir());
-
admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
- LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
+ BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);
failStageIf(Stage.stage_2);
newTimestamps = backupManager.readRegionServerLastLogRollResult();
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
index 38204f68e31..b9197632544 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
@@ -70,17 +71,17 @@ public class TestBackupMerge extends TestBackupBase {
// #2 - insert some data to table1
Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
- LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+ LOG.debug("writing {} rows to {}", ADD_ROWS, table1);
- Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
+ Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH +
ADD_ROWS);
t1.close();
- LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+ LOG.debug("written {} rows to {}", ADD_ROWS, table1);
Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
- Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
+ Assert.assertEquals(HBaseTestingUtil.countRows(t2), NB_ROWS_IN_BATCH +
ADD_ROWS);
t2.close();
- LOG.debug("written " + ADD_ROWS + " rows to " + table2);
+ LOG.debug("written {} rows to {}", ADD_ROWS, table2);
// #3 - incremental backup for multiple tables
tables = Lists.newArrayList(table1, table2);
@@ -112,15 +113,15 @@ public class TestBackupMerge extends TestBackupBase {
tablesRestoreIncMultiple, tablesMapIncMultiple, true));
Table hTable = conn.getTable(table1_restore);
- LOG.debug("After incremental restore: " + hTable.getDescriptor());
- int countRows = TEST_UTIL.countRows(hTable, famName);
- LOG.debug("f1 has " + countRows + " rows");
+ LOG.debug("After incremental restore: {}", hTable.getDescriptor());
+ int countRows = HBaseTestingUtil.countRows(hTable, famName);
+ LOG.debug("f1 has {} rows", countRows);
Assert.assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, countRows);
hTable.close();
hTable = conn.getTable(table2_restore);
- Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 *
ADD_ROWS);
+ Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH +
2 * ADD_ROWS);
hTable.close();
admin.close();
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index 75dd2ef07b3..43a004a471c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -1404,6 +1404,16 @@ public interface Admin extends Abortable, Closeable {
*/
void rollWALWriter(ServerName serverName) throws IOException,
FailedLogCloseException;
+ /**
+ * Roll log writer for all RegionServers. Note that unlike
+ * {@link Admin#rollWALWriter(ServerName)}, this method is synchronous,
which means it will block
+ * until all RegionServers have completed the log roll, or a RegionServer
fails due to an
+ * exception that retry will not work.
+ * @return server and the highest wal filenum of server before performing
log roll
+ * @throws IOException if a remote or network exception occurs
+ */
+ Map<ServerName, Long> rollAllWALWriters() throws IOException;
+
/**
* Helper that delegates to getClusterMetrics().getMasterCoprocessorNames().
* @return an array of master coprocessors
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
index c13dfc33e3d..c866f434e63 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
@@ -635,6 +635,11 @@ class AdminOverAsyncAdmin implements Admin {
get(admin.rollWALWriter(serverName));
}
+ @Override
+ public Map<ServerName, Long> rollAllWALWriters() throws IOException {
+ return get(admin.rollAllWALWriters());
+ }
+
@Override
public CompactionState getCompactionState(TableName tableName) throws
IOException {
return get(admin.getCompactionState(tableName));
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 331aa4a254a..d808aecc815 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -1270,6 +1270,15 @@ public interface AsyncAdmin {
*/
CompletableFuture<Void> rollWALWriter(ServerName serverName);
+ /**
+ * Roll log writer for all RegionServers. Note that unlike
+ * {@link Admin#rollWALWriter(ServerName)}, this method is synchronous,
which means it will block
+ * until all RegionServers have completed the log roll, or a RegionServer
fails due to an
+ * exception that retry will not work.
+ * @return server and the highest wal filenum of server before performing
log roll
+ */
+ CompletableFuture<Map<ServerName, Long>> rollAllWALWriters();
+
/**
* Clear compacting queues on a region server.
* @param serverName The servername of the region server.
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 69f35360003..33ac47c73d6 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -691,6 +691,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
return wrap(rawAdmin.rollWALWriter(serverName));
}
+ @Override
+ public CompletableFuture<Map<ServerName, Long>> rollAllWALWriters() {
+ return wrap(rawAdmin.rollAllWALWriters());
+ }
+
@Override
public CompletableFuture<Void> clearCompactionQueues(ServerName serverName,
Set<String> queues) {
return wrap(rawAdmin.clearCompactionQueues(serverName, queues));
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 79adce33a13..d7501ccedd8 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -105,6 +105,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
@@ -149,6 +150,7 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerR
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@@ -263,6 +265,8 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.Recommissi
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
@@ -497,28 +501,70 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return future;
}
+ /**
+ * short-circuit call for
+ * {@link RawAsyncHBaseAdmin#procedureCall(Object, MasterRpcCall, Converter,
Converter, ProcedureBiConsumer)}
+ * by ignoring procedure result
+ */
private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
- ProcedureBiConsumer consumer) {
+ ProcedureBiConsumer<Void> consumer) {
+ return procedureCall(preq, rpcCall, respConverter, result -> null,
consumer);
+ }
+
+ /**
+ * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall,
Converter, Converter,
+ * ProcedureBiConsumer) by skip setting priority for request
+ */
+ private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(PREQ preq,
+ MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
+ Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES>
consumer) {
return procedureCall(b -> {
- }, preq, rpcCall, respConverter, consumer);
+ }, preq, rpcCall, respConverter, resultConverter, consumer);
}
+ /**
+ * short-circuit call for procedureCall(TableName, Object, MasterRpcCall,
Converter, Converter,
+ * ProcedureBiConsumer) by ignoring procedure result
+ */
private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName
tableName, PREQ preq,
MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
- ProcedureBiConsumer consumer) {
- return procedureCall(b -> b.priority(tableName), preq, rpcCall,
respConverter, consumer);
+ ProcedureBiConsumer<Void> consumer) {
+ return procedureCall(tableName, preq, rpcCall, respConverter, result ->
null, consumer);
+ }
+
+ /**
+ * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall,
Converter, Converter,
+ * ProcedureBiConsumer) by skip setting priority for request
+ */
+ private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(TableName
tableName, PREQ preq,
+ MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
+ Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES>
consumer) {
+ return procedureCall(b -> b.priority(tableName), preq, rpcCall,
respConverter, resultConverter,
+ consumer);
}
- private <PREQ, PRESP> CompletableFuture<Void> procedureCall(
+ /**
+ * @param <PREQ> type of request
+ * @param <PRESP> type of response
+ * @param <PRES> type of procedure call result
+ * @param prioritySetter prioritySetter set priority by table for request
+ * @param preq procedure call request
+ * @param rpcCall procedure rpc call
+ * @param respConverter extract proc id from procedure call response
+ * @param resultConverter extract result from procedure call result
+ * @param consumer action performs on result
+ * @return procedure call result, null if procedure is void
+ */
+ private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(
Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq,
MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
- ProcedureBiConsumer consumer) {
- MasterRequestCallerBuilder<Long> builder = this.<Long>
newMasterCaller().action((controller,
- stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall,
respConverter));
+ Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES>
consumer) {
+ MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller()
+ .action((controller, stub) -> this.call(controller, stub, preq, rpcCall,
respConverter));
prioritySetter.accept(builder);
CompletableFuture<Long> procFuture = builder.call();
- CompletableFuture<Void> future = waitProcedureResult(procFuture);
+ CompletableFuture<PRES> future = waitProcedureResult(procFuture,
resultConverter);
addListener(future, consumer);
return future;
}
@@ -1932,7 +1978,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return failedFuture(new ReplicationException("tableCfs is null"));
}
- CompletableFuture<Void> future = new CompletableFuture<Void>();
+ CompletableFuture<Void> future = new CompletableFuture<>();
addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
if (!completeExceptionally(future, error)) {
ReplicationPeerConfig newPeerConfig =
@@ -1954,7 +2000,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return failedFuture(new ReplicationException("tableCfs is null"));
}
- CompletableFuture<Void> future = new CompletableFuture<Void>();
+ CompletableFuture<Void> future = new CompletableFuture<>();
addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
if (!completeExceptionally(future, error)) {
ReplicationPeerConfig newPeerConfig = null;
@@ -2053,7 +2099,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private void waitSnapshotFinish(SnapshotDescription snapshot,
CompletableFuture<Void> future,
SnapshotResponse resp) {
if (resp.hasProcId()) {
- getProcedureResult(resp.getProcId(), future, 0);
+ getProcedureResult(resp.getProcId(), src -> null, future, 0);
addListener(future, new
SnapshotProcedureBiConsumer(snapshot.getTableName()));
} else {
long expectedTimeout = resp.getExpectedTimeout();
@@ -2269,7 +2315,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.action((controller, stub) -> this.<RestoreSnapshotRequest,
RestoreSnapshotResponse,
Long> call(controller, stub, builder.build(),
(s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) ->
resp.getProcId()))
- .call());
+ .call(), result -> null);
}
@Override
@@ -2681,14 +2727,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private static abstract class ProcedureBiConsumer implements
BiConsumer<Void, Throwable> {
+ private static abstract class ProcedureBiConsumer<T> implements
BiConsumer<T, Throwable> {
abstract void onFinished();
abstract void onError(Throwable error);
@Override
- public void accept(Void v, Throwable error) {
+ public void accept(T value, Throwable error) {
if (error != null) {
onError(error);
return;
@@ -2697,7 +2743,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private static abstract class TableProcedureBiConsumer extends
ProcedureBiConsumer {
+ private static abstract class TableProcedureBiConsumer extends
ProcedureBiConsumer<Void> {
protected final TableName tableName;
TableProcedureBiConsumer(TableName tableName) {
@@ -2722,7 +2768,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private static abstract class NamespaceProcedureBiConsumer extends
ProcedureBiConsumer {
+ private static abstract class NamespaceProcedureBiConsumer extends
ProcedureBiConsumer<Void> {
protected final String namespaceName;
NamespaceProcedureBiConsumer(String namespaceName) {
@@ -2737,12 +2783,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
void onFinished() {
- LOG.info(getDescription() + " completed");
+ LOG.info("{} completed", getDescription());
}
@Override
void onError(Throwable error) {
- LOG.info(getDescription() + " failed with " + error.getMessage());
+ LOG.info("{} failed with {}", getDescription(), error.getMessage());
}
}
@@ -2981,7 +3027,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
}
- private static class ReplicationProcedureBiConsumer extends
ProcedureBiConsumer {
+ private static class ReplicationProcedureBiConsumer extends
ProcedureBiConsumer<Void> {
private final String peerId;
private final Supplier<String> getOperation;
@@ -2996,28 +3042,44 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
void onFinished() {
- LOG.info(getDescription() + " completed");
+ LOG.info("{} completed", getDescription());
}
@Override
void onError(Throwable error) {
- LOG.info(getDescription() + " failed with " + error.getMessage());
+ LOG.info("{} failed with {}", getDescription(), error.getMessage());
}
}
- private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long>
procFuture) {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ private static final class RollAllWALWritersBiConsumer
+ extends ProcedureBiConsumer<Map<ServerName, Long>> {
+
+ @Override
+ void onFinished() {
+ LOG.info("Rolling all WAL writers completed");
+ }
+
+ @Override
+ void onError(Throwable error) {
+ LOG.warn("Rolling all WAL writers failed with {}", error.getMessage());
+ }
+ }
+
+ private <T> CompletableFuture<T> waitProcedureResult(CompletableFuture<Long>
procFuture,
+ Converter<T, ByteString> converter) {
+ CompletableFuture<T> future = new CompletableFuture<>();
addListener(procFuture, (procId, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
- getProcedureResult(procId, future, 0);
+ getProcedureResult(procId, converter, future, 0);
});
return future;
}
- private void getProcedureResult(long procId, CompletableFuture<Void> future,
int retries) {
+ private <T> void getProcedureResult(long procId, Converter<T, ByteString>
converter,
+ CompletableFuture<T> future, int retries) {
addListener(
this.<GetProcedureResultResponse> newMasterCaller()
.action((controller, stub) -> this.<GetProcedureResultRequest,
GetProcedureResultResponse,
@@ -3029,12 +3091,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
if (error != null) {
LOG.warn("failed to get the procedure result procId={}", procId,
ConnectionUtils.translateException(error));
- retryTimer.newTimeout(t -> getProcedureResult(procId, future,
retries + 1),
+ retryTimer.newTimeout(t -> getProcedureResult(procId, converter,
future, retries + 1),
ConnectionUtils.getPauseTime(pauseNs, retries),
TimeUnit.NANOSECONDS);
return;
}
if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
- retryTimer.newTimeout(t -> getProcedureResult(procId, future,
retries + 1),
+ retryTimer.newTimeout(t -> getProcedureResult(procId, converter,
future, retries + 1),
ConnectionUtils.getPauseTime(pauseNs, retries),
TimeUnit.NANOSECONDS);
return;
}
@@ -3042,7 +3104,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
IOException ioe =
ForeignExceptionUtil.toIOException(response.getException());
future.completeExceptionally(ioe);
} else {
- future.complete(null);
+ try {
+ future.complete(converter.convert(response.getResult()));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
}
});
}
@@ -3185,6 +3251,20 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.serverName(serverName).call();
}
+ @Override
+ public CompletableFuture<Map<ServerName, Long>> rollAllWALWriters() {
+ return this
+ .<RollAllWALWritersRequest, RollAllWALWritersResponse,
+ Map<ServerName,
+ Long>> procedureCall(
+ RequestConverter.buildRollAllWALWritersRequest(ng.getNonceGroup(),
ng.newNonce()),
+ (s, c, req, done) -> s.rollAllWALWriters(c, req, done), resp ->
resp.getProcId(),
+ result ->
LastHighestWalFilenum.parseFrom(result.toByteArray()).getFileNumMap()
+ .entrySet().stream().collect(Collectors
+ .toUnmodifiableMap(e -> ServerName.valueOf(e.getKey()),
Map.Entry::getValue)),
+ new RollAllWALWritersBiConsumer());
+ }
+
@Override
public CompletableFuture<Void> clearCompactionQueues(ServerName serverName,
Set<String> queues) {
return this.<Void> newAdminCaller()
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 3bbfac500ce..37fdb1ba6fe 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -139,6 +139,7 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeR
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RegionSpecifierAndState;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
@@ -860,6 +861,11 @@ public final class RequestConverter {
return RollWALWriterRequest.getDefaultInstance();
}
+ public static RollAllWALWritersRequest buildRollAllWALWritersRequest(long
nonceGroup,
+ long nonce) {
+ return
RollAllWALWritersRequest.newBuilder().setNonceGroup(nonceGroup).setNonce(nonce).build();
+ }
+
/**
* Create a new GetServerInfoRequest
* @return a GetServerInfoRequest
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index 4f8a7320fb4..37292d5feef 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -65,7 +65,7 @@ public final class FutureUtils {
try {
// See this post on stack overflow(shorten since the url is too long),
// https://s.apache.org/completionexception
- // For a chain of CompleableFuture, only the first child
CompletableFuture can get the
+ // For a chain of CompletableFuture, only the first child
CompletableFuture can get the
// original exception, others will get a CompletionException, which
wraps the original
// exception. So here we unwrap it before passing it to the callback
action.
action.accept(resp, unwrapCompletionException(error));
diff --git
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
index e6a9d8fb2bd..6e68ce5f190 100644
---
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
+++
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
@@ -262,7 +262,7 @@ public abstract class RemoteProcedureDispatcher<TEnv,
TRemote extends Comparable
* Called when RS tells the remote procedure is succeeded through the
* {@code reportProcedureDone} method.
*/
- void remoteOperationCompleted(TEnv env);
+ void remoteOperationCompleted(TEnv env, byte[] remoteResultData);
/**
* Called when RS tells the remote procedure is failed through the {@code
reportProcedureDone}
diff --git a/hbase-protocol-shaded/src/main/protobuf/HBase.proto
b/hbase-protocol-shaded/src/main/protobuf/HBase.proto
index 0fd3d667d4d..c66ee7eb979 100644
--- a/hbase-protocol-shaded/src/main/protobuf/HBase.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/HBase.proto
@@ -289,3 +289,7 @@ message RotateFileData {
required int64 timestamp = 1;
required bytes data = 2;
}
+
+message LastHighestWalFilenum {
+ map<string, uint64> file_num = 1;
+}
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
index a8adaa27453..768a1d7544e 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
@@ -799,6 +799,15 @@ message ModifyColumnStoreFileTrackerResponse {
message FlushMasterStoreRequest {}
message FlushMasterStoreResponse {}
+message RollAllWALWritersRequest {
+ optional uint64 nonce_group = 1 [default = 0];
+ optional uint64 nonce = 2 [default = 0];
+}
+
+message RollAllWALWritersResponse {
+ optional uint64 proc_id = 1;
+}
+
service MasterService {
/** Used by the client to get the number of regions that have received the
updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
@@ -1270,6 +1279,9 @@ service MasterService {
rpc FlushTable(FlushTableRequest)
returns(FlushTableResponse);
+
+ rpc rollAllWALWriters(RollAllWALWritersRequest)
+ returns(RollAllWALWritersResponse);
}
// HBCK Service definitions.
diff --git
a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
index e3b43afd66a..554d7ec9c41 100644
---
a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
+++
b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
@@ -839,3 +839,21 @@ message ReloadQuotasProcedureStateData {
required ServerName target_server = 1;
optional ForeignExceptionMessage error = 2;
}
+
+enum LogRollProcedureState {
+ LOG_ROLL_ROLL_LOG_ON_RS = 1;
+ LOG_ROLL_COLLECT_RS_HIGHEST_WAL_FILENUM = 2;
+ LOG_ROLL_UNREGISTER_SERVER_LISTENER = 3;
+}
+
+message LogRollRemoteProcedureStateData {
+ required ServerName target_server = 1;
+}
+
+message RSLogRollParameter {
+}
+
+message LogRollRemoteProcedureResult {
+ optional ServerName server_name = 1;
+ optional uint64 last_highest_wal_filenum = 2;
+}
diff --git
a/hbase-protocol-shaded/src/main/protobuf/server/master/RegionServerStatus.proto
b/hbase-protocol-shaded/src/main/protobuf/server/master/RegionServerStatus.proto
index e68ba8e7286..3d2d8c6ff5f 100644
---
a/hbase-protocol-shaded/src/main/protobuf/server/master/RegionServerStatus.proto
+++
b/hbase-protocol-shaded/src/main/protobuf/server/master/RegionServerStatus.proto
@@ -160,6 +160,7 @@ message RemoteProcedureResult {
optional ForeignExceptionMessage error = 3;
// Master active time as fencing token
optional int64 initiating_master_active_time = 4;
+ optional bytes proc_result_data = 5;
}
message ReportProcedureDoneRequest {
repeated RemoteProcedureResult result = 1;
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
index 230795f2747..30eb328fd3c 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
@@ -420,5 +420,4 @@ service AdminService {
rpc GetCachedFilesList(GetCachedFilesListRequest)
returns(GetCachedFilesListResponse);
-
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index fce32333577..fee132b7a4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -303,7 +303,13 @@ public enum EventType {
* RS reload quotas.<br>
* RS_RELOAD_QUOTAS
*/
- RS_RELOAD_QUOTAS(90, ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS);
+ RS_RELOAD_QUOTAS(90, ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS),
+
+ /**
+ * RS log roll.<br>
+ * RS_LOG_ROLL
+ */
+ RS_LOG_ROLL(91, ExecutorType.RS_LOG_ROLL);
private final int code;
private final ExecutorType executor;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index 1d689d276aa..668cd701c0d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -56,7 +56,8 @@ public enum ExecutorType {
RS_CLAIM_REPLICATION_QUEUE(35),
RS_SNAPSHOT_OPERATIONS(36),
RS_FLUSH_OPERATIONS(37),
- RS_RELOAD_QUOTAS_OPERATIONS(38);
+ RS_RELOAD_QUOTAS_OPERATIONS(38),
+ RS_LOG_ROLL(39);
ExecutorType(int value) {
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 1cda553a81d..6f235b2156f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -160,6 +160,7 @@ import
org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.FlushTableProcedure;
import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure;
+import org.apache.hadoop.hbase.master.procedure.LogRollProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
@@ -4201,11 +4202,11 @@ public class HMaster extends
HBaseServerBase<MasterRpcServices> implements Maste
return (RemoteProcedure<MasterProcedureEnv, ?>) procedure;
}
- public void remoteProcedureCompleted(long procId) {
+ public void remoteProcedureCompleted(long procId, byte[] remoteResultData) {
LOG.debug("Remote procedure done, pid={}", procId);
RemoteProcedure<MasterProcedureEnv, ?> procedure =
getRemoteProcedure(procId);
if (procedure != null) {
- procedure.remoteOperationCompleted(procedureExecutor.getEnvironment());
+ procedure.remoteOperationCompleted(procedureExecutor.getEnvironment(),
remoteResultData);
}
}
@@ -4539,7 +4540,7 @@ public class HMaster extends
HBaseServerBase<MasterRpcServices> implements Maste
@Override
protected void run() throws IOException {
getMaster().getMasterCoprocessorHost().preTableFlush(tableName);
- LOG.info(getClientIdAuditPrefix() + " flush " + tableName);
+ LOG.info("{} flush {}", getClientIdAuditPrefix(), tableName);
submitProcedure(
new FlushTableProcedure(procedureExecutor.getEnvironment(),
tableName, columnFamilies));
getMaster().getMasterCoprocessorHost().postTableFlush(tableName);
@@ -4551,4 +4552,21 @@ public class HMaster extends
HBaseServerBase<MasterRpcServices> implements Maste
}
});
}
+
+ @Override
+ public long rollAllWALWriters(long nonceGroup, long nonce) throws
IOException {
+ return MasterProcedureUtil
+ .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
nonceGroup, nonce) {
+ @Override
+ protected void run() {
+ LOG.info("{} roll all wal writers", getClientIdAuditPrefix());
+ submitProcedure(new LogRollProcedure());
+ }
+
+ @Override
+ protected String getDescription() {
+ return "RollAllWALWriters";
+ }
+ });
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index fc246d38d51..de911b54ee9 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -321,6 +321,8 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.Recommissi
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RegionSpecifierAndState;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
@@ -1372,7 +1374,7 @@ public class MasterRpcServices extends
HBaseRpcServicesBase<HMaster>
@Override
public GetProcedureResultResponse getProcedureResult(RpcController
controller,
GetProcedureResultRequest request) throws ServiceException {
- LOG.debug("Checking to see if procedure is done pid=" +
request.getProcId());
+ LOG.debug("Checking to see if procedure is done pid={}",
request.getProcId());
try {
server.checkInitialized();
GetProcedureResultResponse.Builder builder =
GetProcedureResultResponse.newBuilder();
@@ -2575,7 +2577,9 @@ public class MasterRpcServices extends
HBaseRpcServicesBase<HMaster>
}
request.getResultList().forEach(result -> {
if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) {
- server.remoteProcedureCompleted(result.getProcId());
+ byte[] remoteResultData =
+ result.hasProcResultData() ?
result.getProcResultData().toByteArray() : null;
+ server.remoteProcedureCompleted(result.getProcId(), remoteResultData);
} else {
server.remoteProcedureFailed(result.getProcId(),
RemoteProcedureException.fromProto(result.getError()));
@@ -3662,4 +3666,15 @@ public class MasterRpcServices extends
HBaseRpcServicesBase<HMaster>
throw new ServiceException(ioe);
}
}
+
+ @Override
+ public RollAllWALWritersResponse rollAllWALWriters(RpcController
rpcController,
+ RollAllWALWritersRequest request) throws ServiceException {
+ try {
+ long procId = server.rollAllWALWriters(request.getNonceGroup(),
request.getNonce());
+ return RollAllWALWritersResponse.newBuilder().setProcId(procId).build();
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index e9c98d62446..0573b1a7562 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -515,4 +515,10 @@ public interface MasterServices extends Server {
* @return procedure Id
*/
long truncateRegion(RegionInfo regionInfo, long nonceGroup, long nonce)
throws IOException;
+
+ /**
+ * Roll WAL writer for all RegionServers
+ * @return procedure id
+ */
+ long rollAllWALWriters(long nonceGroup, long nonce) throws IOException;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 55cfc28bb53..b99f0448e8f 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -236,6 +236,14 @@ public class ServerManager implements
ConfigurationObserver {
return this.listeners.remove(listener);
}
+ /**
+ * Removes all of the ServerListeners of this collection that satisfy the
given predicate.
+ * @param filter a predicate which returns true for ServerListener to be
removed
+ */
+ public boolean unregisterListenerIf(final Predicate<ServerListener> filter) {
+ return this.listeners.removeIf(filter);
+ }
+
/**
* Let the server manager know a new regionserver has come online
* @param request the startup request
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
index a828b5b668f..cb3b91ca0e2 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
@@ -103,7 +103,7 @@ public abstract class RegionRemoteProcedureBase extends
Procedure<MasterProcedur
newRemoteOperation(MasterProcedureEnv env);
@Override
- public void remoteOperationCompleted(MasterProcedureEnv env) {
+ public void remoteOperationCompleted(MasterProcedureEnv env, byte[]
remoteResultData) {
// should not be called since we use reportRegionStateTransition to report
the result
throw new UnsupportedOperationException();
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
index e0712f1d2aa..4cf685f50a0 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
@@ -166,7 +166,7 @@ public abstract class RegionTransitionProcedure extends
Procedure<MasterProcedur
}
@Override
- public void remoteOperationCompleted(MasterProcedureEnv env) {
+ public void remoteOperationCompleted(MasterProcedureEnv env, byte[]
remoteResultData) {
// should not be called for region operation until we modified the
open/close region procedure
throw new UnsupportedOperationException();
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java
index 7c67f0e3ee9..af482aeff28 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java
@@ -149,7 +149,7 @@ public class FlushRegionProcedure extends
Procedure<MasterProcedureEnv>
}
@Override
- public void remoteOperationCompleted(MasterProcedureEnv env) {
+ public void remoteOperationCompleted(MasterProcedureEnv env, byte[]
remoteResultData) {
complete(env, null);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollProcedure.java
new file mode 100644
index 00000000000..a61b2c4afa5
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollProcedure.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.ServerListener;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollProcedureState;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollRemoteProcedureResult;
+
+/**
+ * The procedure to perform WAL rolling on all of RegionServers.
+ */
[email protected]
+public class LogRollProcedure
+ extends StateMachineProcedure<MasterProcedureEnv, LogRollProcedureState>
+ implements GlobalProcedureInterface {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LogRollProcedure.class);
+
+ public LogRollProcedure() {
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env,
LogRollProcedureState state)
+ throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
+ LOG.info("{} execute state={}", this, state);
+
+ final ServerManager serverManager =
env.getMasterServices().getServerManager();
+
+ try {
+ switch (state) {
+ case LOG_ROLL_ROLL_LOG_ON_RS:
+ // avoid potential new region server missing
+ serverManager.registerListener(new NewServerWALRoller(env));
+
+ final List<LogRollRemoteProcedure> subProcedures =
+
serverManager.getOnlineServersList().stream().map(LogRollRemoteProcedure::new).toList();
+ addChildProcedure(subProcedures.toArray(new
LogRollRemoteProcedure[0]));
+
setNextState(LogRollProcedureState.LOG_ROLL_COLLECT_RS_HIGHEST_WAL_FILENUM);
+ return Flow.HAS_MORE_STATE;
+ case LOG_ROLL_COLLECT_RS_HIGHEST_WAL_FILENUM:
+ // get children procedure
+ List<LogRollRemoteProcedure> children =
+
env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream()
+ .filter(p -> p instanceof LogRollRemoteProcedure)
+ .filter(p -> p.getParentProcId() == getProcId()).map(p ->
(LogRollRemoteProcedure) p)
+ .toList();
+ LastHighestWalFilenum.Builder builder =
LastHighestWalFilenum.newBuilder();
+ for (Procedure<MasterProcedureEnv> child : children) {
+ LogRollRemoteProcedureResult result =
+ LogRollRemoteProcedureResult.parseFrom(child.getResult());
+
builder.putFileNum(ProtobufUtil.toServerName(result.getServerName()).toString(),
+ result.getLastHighestWalFilenum());
+ }
+ setResult(builder.build().toByteArray());
+
setNextState(LogRollProcedureState.LOG_ROLL_UNREGISTER_SERVER_LISTENER);
+ return Flow.HAS_MORE_STATE;
+ case LOG_ROLL_UNREGISTER_SERVER_LISTENER:
+ serverManager.unregisterListenerIf(l -> l instanceof
NewServerWALRoller);
+ return Flow.NO_MORE_STATE;
+ }
+ } catch (Exception e) {
+ setFailure("log-roll", e);
+ }
+ return Flow.NO_MORE_STATE;
+ }
+
+ @Override
+ public String getGlobalId() {
+ return getClass().getSimpleName();
+ }
+
+ private static final class NewServerWALRoller implements ServerListener {
+
+ private final MasterProcedureEnv env;
+
+ public NewServerWALRoller(MasterProcedureEnv env) {
+ this.env = env;
+ }
+
+ @Override
+ public void serverAdded(ServerName server) {
+ env.getMasterServices().getMasterProcedureExecutor()
+ .submitProcedure(new LogRollRemoteProcedure(server));
+ }
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env, LogRollProcedureState
state) {
+ // nothing to rollback
+ }
+
+ @Override
+ protected LogRollProcedureState getState(int stateId) {
+ return LogRollProcedureState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(LogRollProcedureState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected LogRollProcedureState getInitialState() {
+ return LogRollProcedureState.LOG_ROLL_ROLL_LOG_ON_RS;
+ }
+
+ @Override
+ protected boolean abort(MasterProcedureEnv env) {
+ return false;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer)
throws IOException {
+ super.serializeStateData(serializer);
+
+ if (getResult() != null && getResult().length > 0) {
+ serializer.serialize(LastHighestWalFilenum.parseFrom(getResult()));
+ } else {
+ serializer.serialize(LastHighestWalFilenum.getDefaultInstance());
+ }
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
throws IOException {
+ super.deserializeStateData(serializer);
+
+ if (getResult() == null) {
+ LastHighestWalFilenum lastHighestWalFilenum =
+ serializer.deserialize(LastHighestWalFilenum.class);
+ if (lastHighestWalFilenum != null) {
+ if (
+ lastHighestWalFilenum.getFileNumMap().isEmpty()
+ && getCurrentState() ==
LogRollProcedureState.LOG_ROLL_UNREGISTER_SERVER_LISTENER
+ ) {
+ LOG.warn("pid = {}, current state is the last state, but
rsHighestWalFilenumMap is "
+ + "empty, this should not happen. Are all region servers down ?",
getProcId());
+ } else {
+ setResult(lastHighestWalFilenum.toByteArray());
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void toStringClassDetails(StringBuilder sb) {
+ sb.append(getClass().getSimpleName());
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollRemoteProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollRemoteProcedure.java
new file mode 100644
index 00000000000..df8e02ed601
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollRemoteProcedure.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.hbase.ServerName;
+import
org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import org.apache.hadoop.hbase.regionserver.LogRollCallable;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollRemoteProcedureStateData;
+
+/**
+ * The remote procedure to perform WAL rolling on the specific RegionServer
without retrying.
+ */
[email protected]
+public class LogRollRemoteProcedure extends ServerRemoteProcedure
+ implements ServerProcedureInterface {
+
+ public LogRollRemoteProcedure() {
+ }
+
+ public LogRollRemoteProcedure(ServerName targetServer) {
+ this.targetServer = targetServer;
+ }
+
+ @Override
+ protected void rollback(MasterProcedureEnv env) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean abort(MasterProcedureEnv env) {
+ return false;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer)
throws IOException {
+ serializer.serialize(LogRollRemoteProcedureStateData.newBuilder()
+ .setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
throws IOException {
+ LogRollRemoteProcedureStateData data =
+ serializer.deserialize(LogRollRemoteProcedureStateData.class);
+ this.targetServer = ProtobufUtil.toServerName(data.getTargetServer());
+ }
+
+ @Override
+ public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env,
ServerName serverName) {
+ return Optional.of(new ServerOperation(this, getProcId(),
LogRollCallable.class,
+ LogRollRemoteProcedureStateData.getDefaultInstance().toByteArray(),
+ env.getMasterServices().getMasterActiveTime()));
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return targetServer;
+ }
+
+ @Override
+ public boolean hasMetaTableRegion() {
+ return false;
+ }
+
+ @Override
+ public ServerOperationType getServerOperationType() {
+ return ServerOperationType.LOG_ROLL;
+ }
+
+ @Override
+ protected boolean complete(MasterProcedureEnv env, Throwable error) {
+ // do not retry. just returns.
+ if (error != null) {
+ LOG.warn("Failed to roll wal for {}", targetServer, error);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public synchronized void remoteOperationCompleted(MasterProcedureEnv env,
+ byte[] remoteResultData) {
+ setResult(remoteResultData);
+ super.remoteOperationCompleted(env, remoteResultData);
+ }
+
+ @Override
+ protected void toStringClassDetails(StringBuilder sb) {
+ sb.append(getClass().getSimpleName()).append("
targetServer=").append(targetServer);
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
index e73b23a3f96..b7ff6db67db 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
@@ -62,6 +62,11 @@ public interface ServerProcedureInterface {
* Re-read the hbase:quotas table and update {@link QuotaCache}.
*/
RELOAD_QUOTAS,
+
+ /**
+ * send roll log request to region server and handle the response
+ */
+ LOG_ROLL
}
/** Returns Name of this server instance. */
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
index 57912f41903..55920bd47b3 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
@@ -44,6 +44,7 @@ class ServerQueue extends Queue<ServerName> {
case CLAIM_REPLICATION_QUEUE_REMOTE:
case VERIFY_SNAPSHOT:
case RELOAD_QUOTAS:
+ case LOG_ROLL:
return false;
default:
break;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java
index 0c89b639641..563961d765e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java
@@ -123,7 +123,8 @@ public abstract class ServerRemoteProcedure extends
Procedure<MasterProcedureEnv
}
@Override
- public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
+ public synchronized void remoteOperationCompleted(MasterProcedureEnv env,
+ byte[] remoteResultData) {
state =
MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED;
remoteOperationDone(env, null);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java
index 05621767e7f..f4df40b168f 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java
@@ -108,7 +108,7 @@ public class SnapshotRegionProcedure extends
Procedure<MasterProcedureEnv>
}
@Override
- public void remoteOperationCompleted(MasterProcedureEnv env) {
+ public void remoteOperationCompleted(MasterProcedureEnv env, byte[]
remoteResultData) {
complete(env, null);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java
index 68aac1ef6e2..7ea98d00cc7 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java
@@ -28,12 +28,11 @@ public abstract class BaseRSProcedureCallable implements
RSProcedureCallable {
private Exception initError;
@Override
- public final Void call() throws Exception {
+ public final byte[] call() throws Exception {
if (initError != null) {
throw initError;
}
- doCall();
- return null;
+ return doCall();
}
@Override
@@ -46,7 +45,7 @@ public abstract class BaseRSProcedureCallable implements
RSProcedureCallable {
}
}
- protected abstract void doCall() throws Exception;
+ protected abstract byte[] doCall() throws Exception;
protected abstract void initParameter(byte[] parameter) throws Exception;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java
index 635d2b6f87a..7ed9ff7664b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java
@@ -26,7 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* A general interface for a sub procedure runs at RS side.
*/
@InterfaceAudience.Private
-public interface RSProcedureCallable extends Callable<Void> {
+public interface RSProcedureCallable extends Callable<byte[]> {
/**
* Initialize the callable
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java
index 3dd932a1736..e39317290bb 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java
@@ -43,7 +43,7 @@ public class FlushRegionCallable extends
BaseRSProcedureCallable {
private List<byte[]> columnFamilies;
@Override
- protected void doCall() throws Exception {
+ protected byte[] doCall() throws Exception {
HRegion region = rs.getRegion(regionInfo.getEncodedName());
if (region == null) {
throw new NotServingRegionException("region=" +
regionInfo.getRegionNameAsString());
@@ -64,6 +64,7 @@ public class FlushRegionCallable extends
BaseRSProcedureCallable {
LOG.debug("Closing region operation on {}", region);
region.closeRegionOperation();
}
+ return null;
}
@Override
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 350baca36f4..cd49ceb753e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1969,6 +1969,9 @@ public class HRegionServer extends
HBaseServerBase<RSRpcServices>
executorService.startExecutorService(
executorService.new
ExecutorConfig().setExecutorType(ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS)
.setCorePoolSize(rsRefreshQuotasThreads));
+ final int logRollThreads =
conf.getInt("hbase.regionserver.executor.log.roll.threads", 1);
+ executorService.startExecutorService(executorService.new ExecutorConfig()
+
.setExecutorType(ExecutorType.RS_LOG_ROLL).setCorePoolSize(logRollThreads));
Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
uncaughtExceptionHandler);
@@ -2203,7 +2206,7 @@ public class HRegionServer extends
HBaseServerBase<RSRpcServices>
*/
public void stop(final String msg, final boolean force, final User user) {
if (!this.stopped) {
- LOG.info("***** STOPPING region server '" + this + "' *****");
+ LOG.info("***** STOPPING region server '{}' *****", this);
if (this.rsHost != null) {
// when forced via abort don't allow CPs to override
try {
@@ -3551,9 +3554,9 @@ public class HRegionServer extends
HBaseServerBase<RSRpcServices>
.submit(new RSProcedureHandler(this, procId, initiatingMasterActiveTime,
callable));
}
- public void remoteProcedureComplete(long procId, long
initiatingMasterActiveTime,
- Throwable error) {
- procedureResultReporter.complete(procId, initiatingMasterActiveTime,
error);
+ public void remoteProcedureComplete(long procId, long
initiatingMasterActiveTime, Throwable error,
+ byte[] procResultData) {
+ procedureResultReporter.complete(procId, initiatingMasterActiveTime,
error, procResultData);
}
void reportProcedureDone(ReportProcedureDoneRequest request) throws
IOException {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRollCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRollCallable.java
new file mode 100644
index 00000000000..11dc28c2a68
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRollCallable.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.AbstractWALRoller;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollRemoteProcedureResult;
+
[email protected]
+public class LogRollCallable extends BaseRSProcedureCallable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LogRollCallable.class);
+
+ private int maxRollRetry;
+
+ @Override
+ protected byte[] doCall() throws Exception {
+ for (int nAttempt = 0; nAttempt < maxRollRetry; nAttempt++) {
+ try {
+ Pair<Long, Long> filenumPairBefore = getFilenumPair();
+
+ rs.getWalRoller().requestRollAll();
+ rs.getWalRoller().waitUntilWalRollFinished();
+
+ Pair<Long, Long> filenumPairAfter = getFilenumPair();
+ LOG.info(
+ "Before rolling log, highest filenum = {} default WAL filenum = {},
After "
+ + "rolling log, highest filenum = {} default WAL filenum = {}",
+ filenumPairBefore.getFirst(), filenumPairBefore.getSecond(),
filenumPairAfter.getFirst(),
+ filenumPairAfter.getSecond());
+ return LogRollRemoteProcedureResult.newBuilder()
+ .setServerName(ProtobufUtil.toServerName(rs.getServerName()))
+
.setLastHighestWalFilenum(filenumPairBefore.getFirst()).build().toByteArray();
+ } catch (Exception e) {
+ LOG.warn("Failed rolling log on attempt={}", nAttempt, e);
+ if (nAttempt == maxRollRetry - 1) {
+ throw e;
+ }
+ }
+ }
+ return null;
+ }
+
+ private Pair<Long, Long> getFilenumPair() throws IOException {
+ long highestFilenum = rs.getWALs().stream()
+ .mapToLong(wal -> ((AbstractFSWAL<?>)
wal).getFilenum()).max().orElse(-1L);
+ long defaultWALFilenum = ((AbstractFSWAL<?>) rs.getWAL(null)).getFilenum();
+ return Pair.newPair(highestFilenum, defaultWALFilenum);
+ }
+
+ @Override
+ protected void initParameter(byte[] parameter) throws Exception {
+ this.maxRollRetry =
rs.getConfiguration().getInt(AbstractWALRoller.WAL_ROLL_RETRIES, 1);
+ }
+
+ @Override
+ public EventType getEventType() {
+ return EventType.RS_LOG_ROLL;
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index bd232addcec..d325c67a82a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -3966,7 +3966,7 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
LOG.warn("Failed to instantiating remote procedure {}, pid={}",
request.getProcClass(),
request.getProcId(), e);
server.remoteProcedureComplete(request.getProcId(),
request.getInitiatingMasterActiveTime(),
- e);
+ e, null);
return;
}
callable.init(request.getProcData().toByteArray(), server);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReloadQuotasCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReloadQuotasCallable.java
index e134dfda7ac..de23db37856 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReloadQuotasCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReloadQuotasCallable.java
@@ -29,9 +29,10 @@ public class ReloadQuotasCallable extends
BaseRSProcedureCallable {
private static final Logger LOG =
LoggerFactory.getLogger(ReloadQuotasCallable.class);
@Override
- protected void doCall() throws Exception {
+ protected byte[] doCall() throws Exception {
LOG.info("Reloading quotas");
rs.getRegionServerRpcQuotaManager().reload();
+ return null;
}
@Override
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
index 21016fe59dd..7fcf363a919 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
@@ -28,6 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
@@ -51,7 +52,8 @@ class RemoteProcedureResultReporter extends Thread {
this.server = server;
}
- public void complete(long procId, long initiatingMasterActiveTime, Throwable
error) {
+ public void complete(long procId, long initiatingMasterActiveTime, Throwable
error,
+ byte[] procReturnValue) {
RemoteProcedureResult.Builder builder =
RemoteProcedureResult.newBuilder().setProcId(procId)
.setInitiatingMasterActiveTime(initiatingMasterActiveTime);
if (error != null) {
@@ -62,6 +64,9 @@ class RemoteProcedureResultReporter extends Thread {
LOG.debug("Successfully complete execution of pid={}", procId);
builder.setStatus(RemoteProcedureResult.Status.SUCCESS);
}
+ if (procReturnValue != null) {
+ builder.setProcResultData(ByteString.copyFrom(procReturnValue));
+ }
results.add(builder.build());
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java
index 0693aee8750..7158671efb1 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java
@@ -41,7 +41,7 @@ public class SnapshotRegionCallable extends
BaseRSProcedureCallable {
private ForeignExceptionDispatcher monitor;
@Override
- protected void doCall() throws Exception {
+ protected byte[] doCall() throws Exception {
HRegion region = rs.getRegion(regionInfo.getEncodedName());
if (region == null) {
throw new NotServingRegionException(
@@ -78,6 +78,7 @@ public class SnapshotRegionCallable extends
BaseRSProcedureCallable {
LOG.debug("Closing snapshot operation on {}", region);
region.closeRegionOperation(Region.Operation.SNAPSHOT);
}
+ return null;
}
@Override
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java
index db7908d81be..76a3c1cf84e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java
@@ -32,8 +32,9 @@ public class SnapshotVerifyCallable extends
BaseRSProcedureCallable {
private RegionInfo region;
@Override
- protected void doCall() throws Exception {
+ protected byte[] doCall() throws Exception {
rs.getRsSnapshotVerifier().verifyRegion(snapshot, region);
+ return null;
}
@Override
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
index 151c865db79..e6ae50f6e9a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
@@ -79,7 +79,7 @@ public class SplitWALCallable extends BaseRSProcedureCallable
{
}
@Override
- protected void doCall() throws Exception {
+ protected byte[] doCall() throws Exception {
// grab a lock
splitWALLock = splitWALLocks.acquireLock(walPath);
try {
@@ -97,6 +97,7 @@ public class SplitWALCallable extends BaseRSProcedureCallable
{
} finally {
splitWALLock.unlock();
}
+ return null;
}
public String getWalPath() {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
index 6eacc6b78e6..3e150144f2c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
@@ -51,14 +51,16 @@ public class RSProcedureHandler extends EventHandler {
@Override
public void process() {
Throwable error = null;
+ byte[] procResultData = null;
try {
MDC.put("pid", Long.toString(procId));
- callable.call();
+ procResultData = callable.call();
} catch (Throwable t) {
- LOG.error("pid=" + this.procId, t);
+ LOG.error("pid={}", this.procId, t);
error = t;
} finally {
- ((HRegionServer) server).remoteProcedureComplete(procId,
initiatingMasterActiveTime, error);
+ ((HRegionServer) server).remoteProcedureComplete(procId,
initiatingMasterActiveTime, error,
+ procResultData);
}
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java
index 2b7e14f9f7a..73fa2976618 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java
@@ -39,9 +39,10 @@ public class ClaimReplicationQueueCallable extends
BaseRSProcedureCallable {
}
@Override
- protected void doCall() throws Exception {
+ protected byte[] doCall() throws Exception {
PeerProcedureHandler handler =
rs.getReplicationSourceService().getPeerProcedureHandler();
handler.claimReplicationQueue(queueId);
+ return null;
}
@Override
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
index 094a61dcdd1..5d4454c1448 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
@@ -43,7 +43,7 @@ public class RefreshPeerCallable extends
BaseRSProcedureCallable {
private int stage;
@Override
- protected void doCall() throws Exception {
+ protected byte[] doCall() throws Exception {
LOG.info("Received a peer change event, peerId=" + peerId + ", type=" +
type);
PeerProcedureHandler handler =
rs.getReplicationSourceService().getPeerProcedureHandler();
switch (type) {
@@ -68,6 +68,7 @@ public class RefreshPeerCallable extends
BaseRSProcedureCallable {
default:
throw new IllegalArgumentException("Unknown peer modification type: "
+ type);
}
+ return null;
}
@Override
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
index 427fe80b0c3..ed368e18981 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
@@ -69,7 +69,7 @@ public class ReplaySyncReplicationWALCallable extends
BaseRSProcedureCallable {
private final KeyLocker<String> peersLock = new KeyLocker<>();
@Override
- protected void doCall() throws Exception {
+ protected byte[] doCall() throws Exception {
LOG.info("Received a replay sync replication wals {} event, peerId={}",
wals, peerId);
if (rs.getReplicationSinkService() != null) {
Lock peerLock = peersLock.acquireLock(wals.get(0));
@@ -81,6 +81,7 @@ public class ReplaySyncReplicationWALCallable extends
BaseRSProcedureCallable {
peerLock.unlock();
}
}
+ return null;
}
@Override
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
index d09c821b9ed..fd35464e686 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java
@@ -34,8 +34,9 @@ public class SwitchRpcThrottleRemoteCallable extends
BaseRSProcedureCallable {
private boolean rpcThrottleEnabled;
@Override
- protected void doCall() throws Exception {
+ protected byte[] doCall() throws Exception {
rs.getRegionServerRpcQuotaManager().switchRpcThrottle(rpcThrottleEnabled);
+ return null;
}
@Override
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
index c900333af9e..5e645721134 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
@@ -69,7 +69,7 @@ public abstract class AbstractWALRoller<T extends Abortable>
extends Thread impl
* Configure for the max count of log rolling retry. The real retry count is
also limited by the
* timeout of log rolling via {@link #WAL_ROLL_WAIT_TIMEOUT}
*/
- protected static final String WAL_ROLL_RETRIES =
"hbase.regionserver.logroll.retries";
+ public static final String WAL_ROLL_RETRIES =
"hbase.regionserver.logroll.retries";
protected final ConcurrentMap<WAL, RollController> wals = new
ConcurrentHashMap<>();
protected final T abortable;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index e78ca7d0cdb..daaa2e5c2b9 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -568,4 +568,9 @@ public class MockNoopMasterServices implements
MasterServices {
long nonce) throws IOException {
return 0;
}
+
+ @Override
+ public long rollAllWALWriters(long nonceGroup, long nonce) throws
IOException {
+ return 0;
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestLogRollProcedure.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestLogRollProcedure.java
new file mode 100644
index 00000000000..1b587097dda
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestLogRollProcedure.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import static
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY;
+import static
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.DISPATCH_MAX_QUEUE_SIZE_CONF_KEY;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestLogRollProcedure {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestLogRollProcedure.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+ private Configuration conf;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = TEST_UTIL.getConfiguration();
+ conf.set(DISPATCH_DELAY_CONF_KEY, "2000");
+ conf.set(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, "128");
+ TEST_UTIL.startMiniCluster(2);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testSimpleLogRoll() throws IOException {
+ HRegionServer rs = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+ long fileNumBefore = ((AbstractFSWAL<?>) rs.getWAL(null)).getFilenum();
+
+ TEST_UTIL.getAdmin().rollAllWALWriters();
+
+ long fileNumAfter = ((AbstractFSWAL<?>) rs.getWAL(null)).getFilenum();
+ assertTrue(fileNumAfter > fileNumBefore);
+ }
+
+ @Test
+ public void testMasterRestarts() throws IOException {
+ SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HRegionServer rs = cluster.getRegionServer(0);
+ long fileNumBefore = ((AbstractFSWAL<?>) rs.getWAL(null)).getFilenum();
+
+ LogRollProcedure procedure = new LogRollProcedure();
+ long procId =
cluster.getMaster().getMasterProcedureExecutor().submitProcedure(procedure);
+
+ TEST_UTIL.waitFor(60000, () ->
cluster.getMaster().getMasterProcedureExecutor().getProcedures()
+ .stream().anyMatch(p -> p instanceof LogRollRemoteProcedure));
+ ServerName serverName = cluster.getMaster().getServerName();
+ cluster.killMaster(serverName);
+ cluster.waitForMasterToStop(serverName, 30000);
+ cluster.startMaster();
+ cluster.waitForActiveAndReadyMaster();
+
+ ProcedureExecutor<MasterProcedureEnv> exec =
cluster.getMaster().getMasterProcedureExecutor();
+ TEST_UTIL.waitFor(30000, () -> exec.isRunning() &&
exec.isFinished(procId));
+
+ long fileNumAfter = ((AbstractFSWAL<?>) rs.getWAL(null)).getFilenum();
+
+ assertTrue(fileNumAfter > fileNumBefore);
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
index 1500a3c00cd..f828f5ce1ba 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
@@ -188,7 +188,8 @@ public class TestServerRemoteProcedure {
}
@Override
- public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
+ public synchronized void remoteOperationCompleted(MasterProcedureEnv env,
+ byte[] remoteResultData) {
complete(env, null);
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestRegisterPeerWorkerWhenRestarting.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestRegisterPeerWorkerWhenRestarting.java
index 1c4abd15eaf..c0a37c20e88 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestRegisterPeerWorkerWhenRestarting.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestRegisterPeerWorkerWhenRestarting.java
@@ -57,14 +57,14 @@ public class TestRegisterPeerWorkerWhenRestarting extends
SyncReplicationTestBas
}
@Override
- public void remoteProcedureCompleted(long procId) {
+ public void remoteProcedureCompleted(long procId, byte[] data) {
if (
FAIL && getMasterProcedureExecutor()
.getProcedure(procId) instanceof
SyncReplicationReplayWALRemoteProcedure
) {
throw new RuntimeException("Inject error");
}
- super.remoteProcedureCompleted(procId);
+ super.remoteProcedureCompleted(procId, data);
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
index 35c868413e1..4d592b49d0d 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
@@ -526,6 +526,11 @@ public class VerifyingRSGroupAdmin implements Admin,
Closeable {
admin.rollWALWriter(serverName);
}
+ @Override
+ public Map<ServerName, Long> rollAllWALWriters() throws IOException {
+ return admin.rollAllWALWriters();
+ }
+
public CompactionState getCompactionState(TableName tableName) throws
IOException {
return admin.getCompactionState(tableName);
}
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb
b/hbase-shell/src/main/ruby/hbase/admin.rb
index 5ceaf2a08c7..93cc312338c 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -179,6 +179,12 @@ module Hbase
# TODO: remove older hlog_roll version
alias hlog_roll wal_roll
+
#----------------------------------------------------------------------------------------------
+ # Requests all region servers to roll wal writer
+ def wal_roll_all
+ @admin.rollAllWALWriters
+ end
+
#----------------------------------------------------------------------------------------------
# Requests a table or region split
def split(table_or_region_name, split_point = nil)
diff --git a/hbase-shell/src/main/ruby/shell.rb
b/hbase-shell/src/main/ruby/shell.rb
index 46b38dd96b8..6be3854b8a5 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -467,6 +467,7 @@ Shell.load_command_group(
unassign
zk_dump
wal_roll
+ wal_roll_all
hbck_chore_run
catalogjanitor_run
catalogjanitor_switch
diff --git a/hbase-shell/src/main/ruby/shell/commands/wal_roll_all.rb
b/hbase-shell/src/main/ruby/shell/commands/wal_roll_all.rb
new file mode 100644
index 00000000000..13d76449565
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/wal_roll_all.rb
@@ -0,0 +1,37 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+module Shell
+ module Commands
+ class WalRollAll < Command
+ def help
+ <<-EOF
+Request all region servers to roll wal writer. Note that this method is
synchronous,
+which means it will block until all RegionServers have completed the log roll,
+or a RegionServer fails due to an exception that retry will not work. Here is
how
+you would run the command in the hbase shell:
+ hbase> wal_roll_all
+EOF
+ end
+
+ def command
+ admin.wal_roll_all
+ end
+ end
+ end
+end
diff --git
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
index 0eff84bba7c..a0d73dcca21 100644
---
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
+++
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
@@ -826,7 +826,11 @@ public class ThriftAdmin implements Admin {
@Override
public void rollWALWriter(ServerName serverName) {
throw new NotImplementedException("rollWALWriter not supported in
ThriftAdmin");
+ }
+ @Override
+ public Map<ServerName, Long> rollAllWALWriters() {
+ throw new NotImplementedException("rollAllWALWriters not supported in
ThriftAdmin");
}
@Override