This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new a6599df063d HBASE-27659 Incremental backups should re-use splits from
last full backup (#6370)
a6599df063d is described below
commit a6599df063d6c551ee531860a2bfebda19d8c76f
Author: Hernan Romer <[email protected]>
AuthorDate: Thu Jan 23 07:35:41 2025 -0500
HBASE-27659 Incremental backups should re-use splits from last full backup
(#6370)
Co-authored-by: Hernan Gelaf-Romer <[email protected]>
Signed-off-by: Nick Dimiduk <[email protected]>
Signed-off-by: Ray Mattingly <[email protected]>
---
.../hadoop/hbase/backup/BackupRestoreFactory.java | 8 +-
.../org/apache/hadoop/hbase/backup/RestoreJob.java | 4 +
.../apache/hadoop/hbase/backup/RestoreRequest.java | 16 ++
.../backup/impl/IncrementalTableBackupClient.java | 143 +++++++++-----
.../hbase/backup/impl/RestoreTablesClient.java | 15 +-
.../mapreduce/MapReduceHFileSplitterJob.java | 12 +-
.../MapReduceRestoreToOriginalSplitsJob.java | 104 +++++++++++
.../hadoop/hbase/backup/util/BackupUtils.java | 13 +-
.../hadoop/hbase/backup/util/RestoreTool.java | 27 ++-
.../hadoop/hbase/backup/TestIncrementalBackup.java | 176 ++++++++++++++++++
.../apache/hadoop/hbase/mapreduce/WALPlayer.java | 12 +-
.../hbase/snapshot/SnapshotRegionLocator.java | 205 +++++++++++++++++++++
12 files changed, 671 insertions(+), 64 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
index 40bbb4bc7fe..d5126a187eb 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob;
+import
org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreToOriginalSplitsJob;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -43,8 +44,13 @@ public final class BackupRestoreFactory {
* @return backup restore job instance
*/
public static RestoreJob getRestoreJob(Configuration conf) {
+ Class<? extends RestoreJob> defaultCls =
+ conf.getBoolean(RestoreJob.KEEP_ORIGINAL_SPLITS_KEY,
RestoreJob.KEEP_ORIGINAL_SPLITS_DEFAULT)
+ ? MapReduceRestoreToOriginalSplitsJob.class
+ : MapReduceRestoreJob.class;
+
Class<? extends RestoreJob> cls =
- conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, MapReduceRestoreJob.class,
RestoreJob.class);
+ conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, defaultCls,
RestoreJob.class);
RestoreJob service = ReflectionUtils.newInstance(cls, conf);
service.setConf(conf);
return service;
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreJob.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreJob.java
index 831e097cb92..207684e7588 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreJob.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreJob.java
@@ -30,6 +30,10 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface RestoreJob extends Configurable {
+
+ String KEEP_ORIGINAL_SPLITS_KEY =
"hbase.backup.restorejob.keep.original.splits";
+ boolean KEEP_ORIGINAL_SPLITS_DEFAULT = false;
+
/**
* Run restore operation
* @param dirPaths path array of WAL log directories
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreRequest.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreRequest.java
index f7f1d848d95..6e52d312ab7 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreRequest.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreRequest.java
@@ -67,6 +67,11 @@ public final class RestoreRequest {
return this;
}
+ public Builder withKeepOriginalSplits(boolean keepOriginalSplits) {
+ request.setKeepOriginalSplits(keepOriginalSplits);
+ return this;
+ }
+
public RestoreRequest build() {
return request;
}
@@ -80,6 +85,8 @@ public final class RestoreRequest {
private TableName[] toTables;
private boolean overwrite = false;
+ private boolean keepOriginalSplits = false;
+
private RestoreRequest() {
}
@@ -145,4 +152,13 @@ public final class RestoreRequest {
this.overwrite = overwrite;
return this;
}
+
+ public boolean isKeepOriginalSplits() {
+ return keepOriginalSplits;
+ }
+
+ private RestoreRequest setKeepOriginalSplits(boolean keepOriginalSplits) {
+ this.keepOriginalSplits = keepOriginalSplits;
+ return this;
+ }
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 03a6ecc02f3..9314ce2277d 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -30,7 +30,9 @@ import java.util.Set;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupCopyJob;
import org.apache.hadoop.hbase.backup.BackupInfo;
@@ -40,14 +42,17 @@ import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -166,54 +171,60 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
LOG.debug("copying archive {} to {}", archive, tgt);
archiveFiles.add(archive.toString());
}
+ mergeSplitBulkloads(activeFiles, archiveFiles, srcTable);
+ incrementalCopyBulkloadHFiles(tgtFs, srcTable);
}
-
- copyBulkLoadedFiles(activeFiles, archiveFiles);
return bulkLoads;
}
- private void copyBulkLoadedFiles(List<String> activeFiles, List<String>
archiveFiles)
- throws IOException {
- try {
- // Enable special mode of BackupDistCp
- conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5);
- // Copy active files
- String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR +
backupInfo.getBackupId();
- int attempt = 1;
- while (activeFiles.size() > 0) {
- LOG.info("Copy " + activeFiles.size() + " active bulk loaded files.
Attempt =" + attempt++);
- String[] toCopy = new String[activeFiles.size()];
- activeFiles.toArray(toCopy);
- // Active file can be archived during copy operation,
- // we need to handle this properly
- try {
- incrementalCopyHFiles(toCopy, tgtDest);
- break;
- } catch (IOException e) {
- // Check if some files got archived
- // Update active and archived lists
- // When file is being moved from active to archive
- // directory, the number of active files decreases
- int numOfActive = activeFiles.size();
- updateFileLists(activeFiles, archiveFiles);
- if (activeFiles.size() < numOfActive) {
- continue;
- }
- // if not - throw exception
- throw e;
+ private void mergeSplitBulkloads(List<String> activeFiles, List<String>
archiveFiles,
+ TableName tn) throws IOException {
+ int attempt = 1;
+
+ while (!activeFiles.isEmpty()) {
+ LOG.info("MergeSplit {} active bulk loaded files. Attempt={}",
activeFiles.size(), attempt++);
+ // Active file can be archived during copy operation,
+ // we need to handle this properly
+ try {
+ mergeSplitBulkloads(activeFiles, tn);
+ break;
+ } catch (IOException e) {
+ int numActiveFiles = activeFiles.size();
+ updateFileLists(activeFiles, archiveFiles);
+ if (activeFiles.size() < numActiveFiles) {
+ continue;
}
+
+ throw e;
}
- // If incremental copy will fail for archived files
- // we will have partially loaded files in backup destination (only files
from active data
- // directory). It is OK, because the backup will marked as FAILED and
data will be cleaned up
- if (archiveFiles.size() > 0) {
- String[] toCopy = new String[archiveFiles.size()];
- archiveFiles.toArray(toCopy);
- incrementalCopyHFiles(toCopy, tgtDest);
- }
- } finally {
- // Disable special mode of BackupDistCp
- conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
+ }
+
+ if (!archiveFiles.isEmpty()) {
+ mergeSplitBulkloads(archiveFiles, tn);
+ }
+ }
+
+ private void mergeSplitBulkloads(List<String> files, TableName tn) throws
IOException {
+ MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob();
+ conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY,
+ getBulkOutputDirForTable(tn).toString());
+ player.setConf(conf);
+
+ String inputDirs = StringUtils.join(files, ",");
+ String[] args = { inputDirs, tn.getNameWithNamespaceInclAsString() };
+
+ int result;
+
+ try {
+ result = player.run(args);
+ } catch (Exception e) {
+ LOG.error("Failed to run MapReduceHFileSplitterJob", e);
+ throw new IOException(e);
+ }
+
+ if (result != 0) {
+ throw new IOException(
+ "Failed to run MapReduceHFileSplitterJob with invalid result: " +
result);
}
}
@@ -264,6 +275,7 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
try {
// copy out the table and region info files for each table
BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
+ setupRegionLocator();
// convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
convertWALsToHFiles();
incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
@@ -407,6 +419,29 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
}
}
+ private void incrementalCopyBulkloadHFiles(FileSystem tgtFs, TableName tn)
throws IOException {
+ Path bulkOutDir = getBulkOutputDirForTable(tn);
+ FileSystem fs = FileSystem.get(conf);
+
+ if (fs.exists(bulkOutDir)) {
+ conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2);
+ Path tgtPath = getTargetDirForTable(tn);
+ try {
+ RemoteIterator<LocatedFileStatus> locatedFiles =
tgtFs.listFiles(bulkOutDir, true);
+ List<String> files = new ArrayList<>();
+ while (locatedFiles.hasNext()) {
+ LocatedFileStatus file = locatedFiles.next();
+ if (file.isFile() && HFile.isHFileFormat(tgtFs, file.getPath())) {
+ files.add(file.getPath().toString());
+ }
+ }
+ incrementalCopyHFiles(files.toArray(files.toArray(new String[0])),
tgtPath.toString());
+ } finally {
+ conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
+ }
+ }
+ }
+
protected Path getBulkOutputDirForTable(TableName table) {
Path tablePath = getBulkOutputDir();
tablePath = new Path(tablePath, table.getNamespaceAsString());
@@ -422,6 +457,30 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
return path;
}
+ private Path getTargetDirForTable(TableName table) {
+ Path path = new Path(backupInfo.getBackupRootDir() + Path.SEPARATOR +
backupInfo.getBackupId());
+ path = new Path(path, table.getNamespaceAsString());
+ path = new Path(path, table.getNameAsString());
+ return path;
+ }
+
+ private void setupRegionLocator() throws IOException {
+ Map<TableName, String> fullBackupIds = getFullBackupIds();
+ try (BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) {
+
+ for (TableName tableName : backupInfo.getTables()) {
+ String fullBackupId = fullBackupIds.get(tableName);
+ BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(fullBackupId);
+ String snapshotName = fullBackupInfo.getSnapshotName(tableName);
+ Path root = HBackupFileSystem.getTableBackupPath(tableName,
+ new Path(fullBackupInfo.getBackupRootDir()), fullBackupId);
+ String manifestDir =
+ SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName,
root).toString();
+ SnapshotRegionLocator.setSnapshotManifestDir(conf, manifestDir,
tableName);
+ }
+ }
+ }
+
private Map<TableName, String> getFullBackupIds() throws IOException {
// Ancestors are stored from newest to oldest, so we can iterate backwards
// in order to populate our backupId map with the most recent full backup
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
index 0c3c5b40ffb..4ba56f95125 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -59,6 +59,8 @@ public class RestoreTablesClient {
private Path restoreRootDir;
private boolean isOverwrite;
+ private boolean isKeepOriginalSplits;
+
public RestoreTablesClient(Connection conn, RestoreRequest request) throws
IOException {
this.backupRootDir = request.getBackupRootDir();
this.backupId = request.getBackupId();
@@ -68,6 +70,7 @@ public class RestoreTablesClient {
this.tTableArray = sTableArray;
}
this.isOverwrite = request.isOverwrite();
+ this.isKeepOriginalSplits = request.isKeepOriginalSplits();
this.conn = conn;
this.conf = conn.getConfiguration();
if (request.getRestoreRootDir() != null) {
@@ -132,7 +135,7 @@ public class RestoreTablesClient {
*/
private void restoreImages(BackupImage[] images, TableName sTable, TableName
tTable,
- boolean truncateIfExists) throws IOException {
+ boolean truncateIfExists, boolean isKeepOriginalSplits) throws IOException
{
// First image MUST be image of a FULL backup
BackupImage image = images[0];
String rootDir = image.getRootDir();
@@ -148,7 +151,7 @@ public class RestoreTablesClient {
+ tableBackupPath.toString());
conf.set(JOB_NAME_CONF_KEY, "Full_Restore-" + backupId + "-" + tTable);
restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable,
truncateIfExists,
- lastIncrBackupId);
+ isKeepOriginalSplits, lastIncrBackupId);
conf.unset(JOB_NAME_CONF_KEY);
} else { // incremental Backup
throw new IOException("Unexpected backup type " + image.getType());
@@ -183,7 +186,7 @@ public class RestoreTablesClient {
dirList.toArray(paths);
conf.set(JOB_NAME_CONF_KEY, "Incremental_Restore-" + backupId + "-" +
tTable);
restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new
TableName[] { sTable },
- new TableName[] { tTable }, lastIncrBackupId);
+ new TableName[] { tTable }, lastIncrBackupId, isKeepOriginalSplits);
LOG.info(sTable + " has been successfully restored to " + tTable);
}
@@ -208,7 +211,7 @@ public class RestoreTablesClient {
* @throws IOException exception
*/
private void restore(BackupManifest manifest, TableName[] sTableArray,
TableName[] tTableArray,
- boolean isOverwrite) throws IOException {
+ boolean isOverwrite, boolean isKeepOriginalSplits) throws IOException {
TreeSet<BackupImage> restoreImageSet = new TreeSet<>();
for (int i = 0; i < sTableArray.length; i++) {
@@ -223,7 +226,7 @@ public class RestoreTablesClient {
set.addAll(depList);
BackupImage[] arr = new BackupImage[set.size()];
set.toArray(arr);
- restoreImages(arr, table, tTableArray[i], isOverwrite);
+ restoreImages(arr, table, tTableArray[i], isOverwrite,
isKeepOriginalSplits);
restoreImageSet.addAll(list);
if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
LOG.info("Restore includes the following image(s):");
@@ -257,6 +260,6 @@ public class RestoreTablesClient {
Path rootPath = new Path(backupRootDir);
BackupManifest manifest = HBackupFileSystem.getManifest(conf, rootPath,
backupId);
- restore(manifest, sTableArray, tTableArray, isOverwrite);
+ restore(manifest, sTableArray, tTableArray, isOverwrite,
isKeepOriginalSplits);
}
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
index 755b0a41e32..28db0c605f7 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.io.NullWritable;
@@ -117,7 +118,7 @@ public class MapReduceHFileSplitterJob extends Configured
implements Tool {
job.setMapOutputValueClass(MapReduceExtendedCell.class);
try (Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
- RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+ RegionLocator regionLocator = getRegionLocator(conf, conn, tableName))
{
HFileOutputFormat2.configureIncrementalLoad(job,
table.getDescriptor(), regionLocator);
}
LOG.debug("success configuring load incremental job");
@@ -170,4 +171,13 @@ public class MapReduceHFileSplitterJob extends Configured
implements Tool {
int result = job.waitForCompletion(true) ? 0 : 1;
return result;
}
+
+ private static RegionLocator getRegionLocator(Configuration conf, Connection
conn,
+ TableName table) throws IOException {
+ if (SnapshotRegionLocator.shouldUseSnapshotRegionLocator(conf, table)) {
+ return SnapshotRegionLocator.create(conf, table);
+ }
+
+ return conn.getRegionLocator(table);
+ }
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreToOriginalSplitsJob.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreToOriginalSplitsJob.java
new file mode 100644
index 00000000000..942f69a2fb8
--- /dev/null
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreToOriginalSplitsJob.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.RestoreJob;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSVisitor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
[email protected]
+public class MapReduceRestoreToOriginalSplitsJob implements RestoreJob {
+ private Configuration conf;
+
+ @Override
+ public void run(Path[] dirPaths, TableName[] fromTables, Path restoreRootDir,
+ TableName[] toTables, boolean fullBackupRestore) throws IOException {
+ Configuration conf = getConf();
+
+ // We are using the files from the snapshot. We should copy them rather
than move them over
+ conf.setBoolean(BulkLoadHFiles.ALWAYS_COPY_FILES, true);
+
+ FileSystem fs = FileSystem.get(conf);
+ Map<byte[], List<Path>> family2Files = buildFamily2Files(fs, dirPaths,
fullBackupRestore);
+
+ BulkLoadHFiles bulkLoad = BulkLoadHFiles.create(conf);
+ for (int i = 0; i < fromTables.length; i++) {
+ bulkLoad.bulkLoad(toTables[i], family2Files);
+ }
+ }
+
+ @Override
+ public void setConf(Configuration configuration) {
+ this.conf = configuration;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ private static Map<byte[], List<Path>> buildFamily2Files(FileSystem fs,
Path[] dirs,
+ boolean isFullBackup) throws IOException {
+ if (isFullBackup) {
+ return buildFullBackupFamily2Files(fs, dirs);
+ }
+
+ Map<byte[], List<Path>> family2Files = new HashMap<>();
+
+ for (Path dir : dirs) {
+ byte[] familyName = Bytes.toBytes(dir.getParent().getName());
+ if (family2Files.containsKey(familyName)) {
+ family2Files.get(familyName).add(dir);
+ } else {
+ family2Files.put(familyName, Lists.newArrayList(dir));
+ }
+ }
+
+ return family2Files;
+ }
+
+ private static Map<byte[], List<Path>>
buildFullBackupFamily2Files(FileSystem fs, Path[] dirs)
+ throws IOException {
+ Map<byte[], List<Path>> family2Files = new HashMap<>();
+ for (Path regionPath : dirs) {
+ FSVisitor.visitRegionStoreFiles(fs, regionPath, (region, family, name)
-> {
+ Path path = new Path(regionPath, new Path(family, name));
+ byte[] familyName = Bytes.toBytes(family);
+ if (family2Files.containsKey(familyName)) {
+ family2Files.get(familyName).add(path);
+ } else {
+ family2Files.put(familyName, Lists.newArrayList(path));
+ }
+ });
+ }
+ return family2Files;
+ }
+
+}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index 9f1ee261b69..bb3d80a5c1b 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -656,10 +656,17 @@ public final class BackupUtils {
*/
public static RestoreRequest createRestoreRequest(String backupRootDir,
String backupId,
boolean check, TableName[] fromTables, TableName[] toTables, boolean
isOverwrite) {
+ return createRestoreRequest(backupRootDir, backupId, check, fromTables,
toTables, isOverwrite,
+ false);
+ }
+
+ public static RestoreRequest createRestoreRequest(String backupRootDir,
String backupId,
+ boolean check, TableName[] fromTables, TableName[] toTables, boolean
isOverwrite,
+ boolean isKeepOriginalSplits) {
RestoreRequest.Builder builder = new RestoreRequest.Builder();
- RestoreRequest request =
-
builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check)
-
.withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build();
+ RestoreRequest request =
builder.withBackupRootDir(backupRootDir).withBackupId(backupId)
+
.withCheck(check).withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite)
+ .withKeepOriginalSplits(isKeepOriginalSplits).build();
return request;
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
index ff4e2672f7a..6248d7932dd 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
@@ -143,16 +143,19 @@ public class RestoreTool {
* During incremental backup operation. Call WalPlayer to replay WAL in
backup image Currently
* tableNames and newTablesNames only contain single table, will be expanded
to multiple tables in
* the future
- * @param conn HBase connection
- * @param tableBackupPath backup path
- * @param logDirs : incremental backup folders, which contains WAL
- * @param tableNames : source tableNames(table names were backuped)
- * @param newTableNames : target tableNames(table names to be restored to)
- * @param incrBackupId incremental backup Id
+ * @param conn HBase connection
+ * @param tableBackupPath backup path
+ * @param logDirs : incremental backup folders, which contains WAL
+ * @param tableNames : source tableNames(table names were backuped)
+ * @param newTableNames : target tableNames(table names to be restored
to)
+ * @param incrBackupId incremental backup Id
+ * @param keepOriginalSplits whether the original region splits from the
full backup should be
+ * kept
* @throws IOException exception
*/
public void incrementalRestoreTable(Connection conn, Path tableBackupPath,
Path[] logDirs,
- TableName[] tableNames, TableName[] newTableNames, String incrBackupId)
throws IOException {
+ TableName[] tableNames, TableName[] newTableNames, String incrBackupId,
+ boolean keepOriginalSplits) throws IOException {
try (Admin admin = conn.getAdmin()) {
if (tableNames.length != newTableNames.length) {
throw new IOException("Number of source tables and target tables does
not match!");
@@ -200,6 +203,7 @@ public class RestoreTool {
LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " +
newTableDescriptor);
}
}
+ conf.setBoolean(RestoreJob.KEEP_ORIGINAL_SPLITS_KEY, keepOriginalSplits);
RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
restoreService.run(logDirs, tableNames, restoreRootDir, newTableNames,
false);
@@ -207,9 +211,10 @@ public class RestoreTool {
}
public void fullRestoreTable(Connection conn, Path tableBackupPath,
TableName tableName,
- TableName newTableName, boolean truncateIfExists, String lastIncrBackupId)
throws IOException {
+ TableName newTableName, boolean truncateIfExists, boolean
isKeepOriginalSplits,
+ String lastIncrBackupId) throws IOException {
createAndRestoreTable(conn, tableName, newTableName, tableBackupPath,
truncateIfExists,
- lastIncrBackupId);
+ isKeepOriginalSplits, lastIncrBackupId);
}
/**
@@ -283,7 +288,8 @@ public class RestoreTool {
}
private void createAndRestoreTable(Connection conn, TableName tableName,
TableName newTableName,
- Path tableBackupPath, boolean truncateIfExists, String lastIncrBackupId)
throws IOException {
+ Path tableBackupPath, boolean truncateIfExists, boolean
isKeepOriginalSplits,
+ String lastIncrBackupId) throws IOException {
if (newTableName == null) {
newTableName = tableName;
}
@@ -349,6 +355,7 @@ public class RestoreTool {
// should only try to create the table with all region informations, so
we could pre-split
// the regions in fine grain
checkAndCreateTable(conn, newTableName, regionPathList, tableDescriptor,
truncateIfExists);
+ conf.setBoolean(RestoreJob.KEEP_ORIGINAL_SPLITS_KEY,
isKeepOriginalSplits);
RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
Path[] paths = new Path[regionPathList.size()];
regionPathList.toArray(paths);
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
index fce64c276e6..2c434a124ac 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
@@ -18,15 +18,23 @@
package org.apache.hadoop.hbase.backup;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
@@ -43,9 +51,14 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.LogRoller;
import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
@@ -69,6 +82,8 @@ public class TestIncrementalBackup extends TestBackupBase {
HBaseClassTestRule.forClass(TestIncrementalBackup.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestIncrementalBackup.class);
+ private static final byte[] BULKLOAD_START_KEY = new byte[] { 0x00 };
+ private static final byte[] BULKLOAD_END_KEY = new byte[] { Byte.MAX_VALUE };
@Parameterized.Parameters
public static Collection<Object[]> data() {
@@ -81,6 +96,34 @@ public class TestIncrementalBackup extends TestBackupBase {
public TestIncrementalBackup(Boolean b) {
}
+ @After
+ public void ensurePreviousBackupTestsAreCleanedUp() throws Exception {
+ TEST_UTIL.flush(table1);
+ TEST_UTIL.flush(table2);
+ TEST_UTIL.flush(table1_restore);
+
+ TEST_UTIL.truncateTable(table1).close();
+ TEST_UTIL.truncateTable(table2).close();
+ TEST_UTIL.truncateTable(table1_restore).close();
+
+ TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(rst -> {
+ try {
+ LogRoller walRoller = rst.getRegionServer().getWalRoller();
+ walRoller.requestRollAll();
+ walRoller.waitUntilWalRollFinished();
+ } catch (Exception ignored) {
+ }
+ });
+
+ try (Table table = TEST_UTIL.getConnection().getTable(table1)) {
+ loadTable(table);
+ }
+
+ try (Table table = TEST_UTIL.getConnection().getTable(table2)) {
+ loadTable(table);
+ }
+ }
+
// implement all test cases in 1 test since incremental
// backup/restore has dependencies
@Test
@@ -238,6 +281,101 @@ public class TestIncrementalBackup extends TestBackupBase
{
}
}
+ @Test
+ public void TestIncBackupRestoreWithOriginalSplits() throws Exception {
+ byte[] mobFam = Bytes.toBytes("mob");
+
+ List<TableName> tables = Lists.newArrayList(table1);
+ TableDescriptor newTable1Desc =
+
TableDescriptorBuilder.newBuilder(table1Desc).setColumnFamily(ColumnFamilyDescriptorBuilder
+
.newBuilder(mobFam).setMobEnabled(true).setMobThreshold(5L).build()).build();
+ TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+ Connection conn = TEST_UTIL.getConnection();
+ BackupAdminImpl backupAdmin = new BackupAdminImpl(conn);
+ BackupRequest request = createBackupRequest(BackupType.FULL, tables,
BACKUP_ROOT_DIR);
+ String fullBackupId = backupAdmin.backupTables(request);
+ assertTrue(checkSucceeded(fullBackupId));
+
+ TableName[] fromTables = new TableName[] { table1 };
+ TableName[] toTables = new TableName[] { table1_restore };
+
+ List<LocatedFileStatus> preRestoreBackupFiles = getBackupFiles();
+ backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR,
fullBackupId, false,
+ fromTables, toTables, true, true));
+ List<LocatedFileStatus> postRestoreBackupFiles = getBackupFiles();
+
+ // Check that the backup files are the same before and after the restore
process
+ Assert.assertEquals(postRestoreBackupFiles, preRestoreBackupFiles);
+ Assert.assertEquals(TEST_UTIL.countRows(table1_restore), NB_ROWS_IN_BATCH);
+
+ int ROWS_TO_ADD = 1_000;
+ // different IDs so that rows don't overlap
+ insertIntoTable(conn, table1, famName, 3, ROWS_TO_ADD);
+ insertIntoTable(conn, table1, mobFam, 4, ROWS_TO_ADD);
+
+ try (Admin admin = conn.getAdmin()) {
+ List<HRegion> currentRegions =
TEST_UTIL.getHBaseCluster().getRegions(table1);
+ for (HRegion region : currentRegions) {
+ byte[] name = region.getRegionInfo().getEncodedNameAsBytes();
+ admin.splitRegionAsync(name).get();
+ }
+
+ TEST_UTIL.waitTableAvailable(table1);
+
+ // Make sure we've split regions
+ assertNotEquals(currentRegions,
TEST_UTIL.getHBaseCluster().getRegions(table1));
+
+ request = createBackupRequest(BackupType.INCREMENTAL, tables,
BACKUP_ROOT_DIR);
+ String incrementalBackupId = backupAdmin.backupTables(request);
+ assertTrue(checkSucceeded(incrementalBackupId));
+ preRestoreBackupFiles = getBackupFiles();
+ backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR,
incrementalBackupId,
+ false, fromTables, toTables, true, true));
+ postRestoreBackupFiles = getBackupFiles();
+ Assert.assertEquals(postRestoreBackupFiles, preRestoreBackupFiles);
+ Assert.assertEquals(NB_ROWS_IN_BATCH + ROWS_TO_ADD + ROWS_TO_ADD,
+ TEST_UTIL.countRows(table1_restore));
+
+ // test bulkloads
+ HRegion regionToBulkload =
TEST_UTIL.getHBaseCluster().getRegions(table1).get(0);
+ String regionName = regionToBulkload.getRegionInfo().getEncodedName();
+
+ insertIntoTable(conn, table1, famName, 5, ROWS_TO_ADD);
+ insertIntoTable(conn, table1, mobFam, 6, ROWS_TO_ADD);
+
+ doBulkload(table1, regionName, famName, mobFam);
+
+ // we need to major compact the regions to make sure there are no
references
+ // and the regions are once again splittable
+ TEST_UTIL.compact(true);
+ TEST_UTIL.flush();
+ TEST_UTIL.waitTableAvailable(table1);
+
+ for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(table1)) {
+ if (region.isSplittable()) {
+
admin.splitRegionAsync(region.getRegionInfo().getEncodedNameAsBytes()).get();
+ }
+ }
+
+ request = createBackupRequest(BackupType.INCREMENTAL, tables,
BACKUP_ROOT_DIR);
+ incrementalBackupId = backupAdmin.backupTables(request);
+ assertTrue(checkSucceeded(incrementalBackupId));
+
+ preRestoreBackupFiles = getBackupFiles();
+ backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR,
incrementalBackupId,
+ false, fromTables, toTables, true, true));
+ postRestoreBackupFiles = getBackupFiles();
+
+ Assert.assertEquals(postRestoreBackupFiles, preRestoreBackupFiles);
+
+ int rowsExpected = TEST_UTIL.countRows(table1);
+ int rowsActual = TEST_UTIL.countRows(table1_restore);
+
+ Assert.assertEquals(rowsExpected, rowsActual);
+ }
+ }
+
private void checkThrowsCFMismatch(IOException ex, List<TableName> tables) {
Throwable cause = Throwables.getRootCause(ex);
assertEquals(cause.getClass(), ColumnFamilyMismatchException.class);
@@ -253,6 +391,32 @@ public class TestIncrementalBackup extends TestBackupBase {
return backupId;
}
+ private static void doBulkload(TableName tn, String regionName, byte[]...
fams)
+ throws IOException {
+ Path regionDir = createHFiles(tn, regionName, fams);
+ Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> results =
+ BulkLoadHFiles.create(conf1).bulkLoad(tn, regionDir);
+ assertFalse(results.isEmpty());
+ }
+
+ private static Path createHFiles(TableName tn, String regionName, byte[]...
fams)
+ throws IOException {
+ Path rootdir = CommonFSUtils.getRootDir(conf1);
+ Path regionDir = CommonFSUtils.getRegionDir(rootdir, tn, regionName);
+
+ FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
+ fs.mkdirs(rootdir);
+
+ for (byte[] fam : fams) {
+ Path famDir = new Path(regionDir, Bytes.toString(fam));
+ Path hFileDir = new Path(famDir, UUID.randomUUID().toString());
+ HFileTestUtil.createHFile(conf1, fs, hFileDir, fam, qualName,
BULKLOAD_START_KEY,
+ BULKLOAD_END_KEY, 1000);
+ }
+
+ return regionDir;
+ }
+
/**
* Check that backup manifest can be produced for a different root. Users
may want to move
* existing backups to a different location.
@@ -269,4 +433,16 @@ public class TestIncrementalBackup extends TestBackupBase {
assertEquals(anotherRootDir, ancestor.getRootDir());
}
}
+
+ private List<LocatedFileStatus> getBackupFiles() throws IOException {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new
Path(BACKUP_ROOT_DIR), true);
+ List<LocatedFileStatus> files = new ArrayList<>();
+
+ while (iter.hasNext()) {
+ files.add(iter.next());
+ }
+
+ return files;
+ }
}
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 696e5257244..e06300848f6 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.TableInfo;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
@@ -379,7 +380,7 @@ public class WALPlayer extends Configured implements Tool {
List<TableInfo> tableInfoList = new ArrayList<>();
for (TableName tableName : tableNames) {
Table table = conn.getTable(tableName);
- RegionLocator regionLocator = conn.getRegionLocator(tableName);
+ RegionLocator regionLocator = getRegionLocator(tableName, conf,
conn);
tableInfoList.add(new TableInfo(table.getDescriptor(),
regionLocator));
}
if (multiTableSupport) {
@@ -475,4 +476,13 @@ public class WALPlayer extends Configured implements Tool {
Job job = createSubmittableJob(args);
return job.waitForCompletion(true) ? 0 : 1;
}
+
+ private static RegionLocator getRegionLocator(TableName tableName,
Configuration conf,
+ Connection conn) throws IOException {
+ if (SnapshotRegionLocator.shouldUseSnapshotRegionLocator(conf, tableName))
{
+ return SnapshotRegionLocator.create(conf, tableName);
+ }
+
+ return conn.getRegionLocator(tableName);
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotRegionLocator.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotRegionLocator.java
new file mode 100644
index 00000000000..5c90dd900f7
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotRegionLocator.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
+
+/**
+ * {@link RegionLocator} built using the most recent full backup's snapshot
manifest for a given
+ * table. Useful for aligning any subsequent incremental backups along the
same splits as the full
+ * backup
+ */
[email protected]
+public final class SnapshotRegionLocator implements RegionLocator {
+
+ private static final String SNAPSHOT_MANIFEST_DIR_PREFIX =
+ "region.locator.snapshot.manifest.dir.";
+
+ private static final ServerName FAKE_SERVER_NAME =
+ ServerName.parseServerName("www.example.net,1234,1212121212");
+
+ private final TableName tableName;
+ private final TreeMap<byte[], HRegionReplicas> regions;
+
+ private final List<HRegionLocation> rawLocations;
+
+ public static SnapshotRegionLocator create(Configuration conf, TableName
table)
+ throws IOException {
+ Path workingDir = new Path(conf.get(getSnapshotManifestDirKey(table)));
+ FileSystem fs = workingDir.getFileSystem(conf);
+ SnapshotProtos.SnapshotDescription desc =
+ SnapshotDescriptionUtils.readSnapshotInfo(fs, workingDir);
+ SnapshotManifest manifest = SnapshotManifest.open(conf, fs, workingDir,
desc);
+
+ TableName tableName = manifest.getTableDescriptor().getTableName();
+ TreeMap<byte[], HRegionReplicas> replicas = new
TreeMap<>(Bytes.BYTES_COMPARATOR);
+ List<HRegionLocation> rawLocations = new ArrayList<>();
+
+ for (SnapshotProtos.SnapshotRegionManifest region :
manifest.getRegionManifests()) {
+ HBaseProtos.RegionInfo ri = region.getRegionInfo();
+ byte[] key = ri.getStartKey().toByteArray();
+
+ SnapshotHRegionLocation location = toLocation(ri, tableName);
+ rawLocations.add(location);
+ HRegionReplicas hrr = replicas.get(key);
+
+ if (hrr == null) {
+ hrr = new HRegionReplicas(location);
+ } else {
+ hrr.addReplica(location);
+ }
+
+ replicas.put(key, hrr);
+ }
+
+ return new SnapshotRegionLocator(tableName, replicas, rawLocations);
+ }
+
+ private SnapshotRegionLocator(TableName tableName, TreeMap<byte[],
HRegionReplicas> regions,
+ List<HRegionLocation> rawLocations) {
+ this.tableName = tableName;
+ this.regions = regions;
+ this.rawLocations = rawLocations;
+ }
+
+ @Override
+ public HRegionLocation getRegionLocation(byte[] row, int replicaId, boolean
reload)
+ throws IOException {
+ return regions.floorEntry(row).getValue().getReplica(replicaId);
+ }
+
+ @Override
+ public List<HRegionLocation> getRegionLocations(byte[] row, boolean reload)
throws IOException {
+ return Collections.singletonList(getRegionLocation(row, reload));
+ }
+
+ @Override
+ public void clearRegionLocationCache() {
+
+ }
+
+ @Override
+ public List<HRegionLocation> getAllRegionLocations() throws IOException {
+ return rawLocations;
+ }
+
+ @Override
+ public TableName getName() {
+ return tableName;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ public static boolean shouldUseSnapshotRegionLocator(Configuration conf,
TableName table) {
+ return conf.get(getSnapshotManifestDirKey(table)) != null;
+ }
+
+ public static void setSnapshotManifestDir(Configuration conf, String dir,
TableName table) {
+ conf.set(getSnapshotManifestDirKey(table), dir);
+ }
+
+ private static String getSnapshotManifestDirKey(TableName table) {
+ return SNAPSHOT_MANIFEST_DIR_PREFIX +
table.getNameWithNamespaceInclAsString();
+ }
+
+ private static SnapshotHRegionLocation toLocation(HBaseProtos.RegionInfo ri,
+ TableName tableName) {
+ RegionInfo region = RegionInfoBuilder.newBuilder(tableName)
+
.setStartKey(ri.getStartKey().toByteArray()).setEndKey(ri.getEndKey().toByteArray())
+ .setRegionId(ri.getRegionId()).setReplicaId(ri.getReplicaId()).build();
+
+ return new SnapshotHRegionLocation(region);
+ }
+
+ private static final class HRegionReplicas {
+ private final Map<Integer, SnapshotHRegionLocation> replicas = new
HashMap<>();
+
+ private HRegionReplicas(SnapshotHRegionLocation region) {
+ addReplica(region);
+ }
+
+ private void addReplica(SnapshotHRegionLocation replica) {
+ this.replicas.put(replica.getRegion().getReplicaId(), replica);
+ }
+
+ private HRegionLocation getReplica(int replicaId) {
+ return replicas.get(replicaId);
+ }
+ }
+
+ public static final class SnapshotHRegionLocation extends HRegionLocation {
+
+ public SnapshotHRegionLocation(RegionInfo regionInfo) {
+ super(regionInfo, FAKE_SERVER_NAME);
+ }
+
+ @Override
+ public ServerName getServerName() {
+ throw new NotImplementedException("SnapshotHRegionLocation doesn't have
a server name");
+ }
+
+ @Override
+ public String getHostname() {
+ throw new NotImplementedException("SnapshotHRegionLocation doesn't have
a host name");
+ }
+
+ @Override
+ public int getPort() {
+ throw new NotImplementedException("SnapshotHRegionLocation doesn't have
a port");
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getRegion().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override
+ public int compareTo(HRegionLocation o) {
+ return this.getRegion().compareTo(o.getRegion());
+ }
+ }
+}