This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new f3cecb18d1b HBASE-27659 Incremental backups should re-use splits from 
last full backup (#6370)
f3cecb18d1b is described below

commit f3cecb18d1b45a54de898fe33852ce6e136ce719
Author: Hernan Romer <[email protected]>
AuthorDate: Thu Jan 23 07:35:41 2025 -0500

    HBASE-27659 Incremental backups should re-use splits from last full backup 
(#6370)
    
    Co-authored-by: Hernan Gelaf-Romer <[email protected]>
    Signed-off-by: Nick Dimiduk <[email protected]>
    Signed-off-by: Ray Mattingly <[email protected]>
---
 .../hadoop/hbase/backup/BackupRestoreFactory.java  |   8 +-
 .../org/apache/hadoop/hbase/backup/RestoreJob.java |   4 +
 .../apache/hadoop/hbase/backup/RestoreRequest.java |  16 ++
 .../backup/impl/IncrementalTableBackupClient.java  | 143 +++++++++-----
 .../hbase/backup/impl/RestoreTablesClient.java     |  15 +-
 .../mapreduce/MapReduceHFileSplitterJob.java       |  12 +-
 .../MapReduceRestoreToOriginalSplitsJob.java       | 104 +++++++++++
 .../hadoop/hbase/backup/util/BackupUtils.java      |  13 +-
 .../hadoop/hbase/backup/util/RestoreTool.java      |  27 ++-
 .../hadoop/hbase/backup/TestIncrementalBackup.java | 176 ++++++++++++++++++
 .../apache/hadoop/hbase/mapreduce/WALPlayer.java   |  12 +-
 .../hbase/snapshot/SnapshotRegionLocator.java      | 205 +++++++++++++++++++++
 12 files changed, 671 insertions(+), 64 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
index 40bbb4bc7fe..d5126a187eb 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob;
+import 
org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreToOriginalSplitsJob;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -43,8 +44,13 @@ public final class BackupRestoreFactory {
    * @return backup restore job instance
    */
   public static RestoreJob getRestoreJob(Configuration conf) {
+    Class<? extends RestoreJob> defaultCls =
+      conf.getBoolean(RestoreJob.KEEP_ORIGINAL_SPLITS_KEY, 
RestoreJob.KEEP_ORIGINAL_SPLITS_DEFAULT)
+        ? MapReduceRestoreToOriginalSplitsJob.class
+        : MapReduceRestoreJob.class;
+
     Class<? extends RestoreJob> cls =
-      conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, MapReduceRestoreJob.class, 
RestoreJob.class);
+      conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, defaultCls, 
RestoreJob.class);
     RestoreJob service = ReflectionUtils.newInstance(cls, conf);
     service.setConf(conf);
     return service;
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreJob.java 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreJob.java
index 831e097cb92..207684e7588 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreJob.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreJob.java
@@ -30,6 +30,10 @@ import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
 public interface RestoreJob extends Configurable {
+
+  String KEEP_ORIGINAL_SPLITS_KEY = 
"hbase.backup.restorejob.keep.original.splits";
+  boolean KEEP_ORIGINAL_SPLITS_DEFAULT = false;
+
   /**
    * Run restore operation
    * @param dirPaths          path array of WAL log directories
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreRequest.java 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreRequest.java
index f7f1d848d95..6e52d312ab7 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreRequest.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreRequest.java
@@ -67,6 +67,11 @@ public final class RestoreRequest {
       return this;
     }
 
+    public Builder withKeepOriginalSplits(boolean keepOriginalSplits) {
+      request.setKeepOriginalSplits(keepOriginalSplits);
+      return this;
+    }
+
     public RestoreRequest build() {
       return request;
     }
@@ -80,6 +85,8 @@ public final class RestoreRequest {
   private TableName[] toTables;
   private boolean overwrite = false;
 
+  private boolean keepOriginalSplits = false;
+
   private RestoreRequest() {
   }
 
@@ -145,4 +152,13 @@ public final class RestoreRequest {
     this.overwrite = overwrite;
     return this;
   }
+
+  public boolean isKeepOriginalSplits() {
+    return keepOriginalSplits;
+  }
+
+  private RestoreRequest setKeepOriginalSplits(boolean keepOriginalSplits) {
+    this.keepOriginalSplits = keepOriginalSplits;
+    return this;
+  }
 }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 03a6ecc02f3..9314ce2277d 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -30,7 +30,9 @@ import java.util.Set;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupCopyJob;
 import org.apache.hadoop.hbase.backup.BackupInfo;
@@ -40,14 +42,17 @@ import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
+import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.hbase.mapreduce.WALPlayer;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -166,54 +171,60 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
         LOG.debug("copying archive {} to {}", archive, tgt);
         archiveFiles.add(archive.toString());
       }
+      mergeSplitBulkloads(activeFiles, archiveFiles, srcTable);
+      incrementalCopyBulkloadHFiles(tgtFs, srcTable);
     }
-
-    copyBulkLoadedFiles(activeFiles, archiveFiles);
     return bulkLoads;
   }
 
-  private void copyBulkLoadedFiles(List<String> activeFiles, List<String> 
archiveFiles)
-    throws IOException {
-    try {
-      // Enable special mode of BackupDistCp
-      conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5);
-      // Copy active files
-      String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + 
backupInfo.getBackupId();
-      int attempt = 1;
-      while (activeFiles.size() > 0) {
-        LOG.info("Copy " + activeFiles.size() + " active bulk loaded files. 
Attempt =" + attempt++);
-        String[] toCopy = new String[activeFiles.size()];
-        activeFiles.toArray(toCopy);
-        // Active file can be archived during copy operation,
-        // we need to handle this properly
-        try {
-          incrementalCopyHFiles(toCopy, tgtDest);
-          break;
-        } catch (IOException e) {
-          // Check if some files got archived
-          // Update active and archived lists
-          // When file is being moved from active to archive
-          // directory, the number of active files decreases
-          int numOfActive = activeFiles.size();
-          updateFileLists(activeFiles, archiveFiles);
-          if (activeFiles.size() < numOfActive) {
-            continue;
-          }
-          // if not - throw exception
-          throw e;
+  private void mergeSplitBulkloads(List<String> activeFiles, List<String> 
archiveFiles,
+    TableName tn) throws IOException {
+    int attempt = 1;
+
+    while (!activeFiles.isEmpty()) {
+      LOG.info("MergeSplit {} active bulk loaded files. Attempt={}", 
activeFiles.size(), attempt++);
+      // Active file can be archived during copy operation,
+      // we need to handle this properly
+      try {
+        mergeSplitBulkloads(activeFiles, tn);
+        break;
+      } catch (IOException e) {
+        int numActiveFiles = activeFiles.size();
+        updateFileLists(activeFiles, archiveFiles);
+        if (activeFiles.size() < numActiveFiles) {
+          continue;
         }
+
+        throw e;
       }
-      // If incremental copy will fail for archived files
-      // we will have partially loaded files in backup destination (only files 
from active data
-      // directory). It is OK, because the backup will marked as FAILED and 
data will be cleaned up
-      if (archiveFiles.size() > 0) {
-        String[] toCopy = new String[archiveFiles.size()];
-        archiveFiles.toArray(toCopy);
-        incrementalCopyHFiles(toCopy, tgtDest);
-      }
-    } finally {
-      // Disable special mode of BackupDistCp
-      conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
+    }
+
+    if (!archiveFiles.isEmpty()) {
+      mergeSplitBulkloads(archiveFiles, tn);
+    }
+  }
+
+  private void mergeSplitBulkloads(List<String> files, TableName tn) throws 
IOException {
+    MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob();
+    conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY,
+      getBulkOutputDirForTable(tn).toString());
+    player.setConf(conf);
+
+    String inputDirs = StringUtils.join(files, ",");
+    String[] args = { inputDirs, tn.getNameWithNamespaceInclAsString() };
+
+    int result;
+
+    try {
+      result = player.run(args);
+    } catch (Exception e) {
+      LOG.error("Failed to run MapReduceHFileSplitterJob", e);
+      throw new IOException(e);
+    }
+
+    if (result != 0) {
+      throw new IOException(
+        "Failed to run MapReduceHFileSplitterJob with invalid result: " + 
result);
     }
   }
 
@@ -264,6 +275,7 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     try {
       // copy out the table and region info files for each table
       BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
+      setupRegionLocator();
       // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
       convertWALsToHFiles();
       incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
@@ -407,6 +419,29 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     }
   }
 
+  private void incrementalCopyBulkloadHFiles(FileSystem tgtFs, TableName tn) 
throws IOException {
+    Path bulkOutDir = getBulkOutputDirForTable(tn);
+    FileSystem fs = FileSystem.get(conf);
+
+    if (fs.exists(bulkOutDir)) {
+      conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2);
+      Path tgtPath = getTargetDirForTable(tn);
+      try {
+        RemoteIterator<LocatedFileStatus> locatedFiles = 
tgtFs.listFiles(bulkOutDir, true);
+        List<String> files = new ArrayList<>();
+        while (locatedFiles.hasNext()) {
+          LocatedFileStatus file = locatedFiles.next();
+          if (file.isFile() && HFile.isHFileFormat(tgtFs, file.getPath())) {
+            files.add(file.getPath().toString());
+          }
+        }
+        incrementalCopyHFiles(files.toArray(files.toArray(new String[0])), 
tgtPath.toString());
+      } finally {
+        conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
+      }
+    }
+  }
+
   protected Path getBulkOutputDirForTable(TableName table) {
     Path tablePath = getBulkOutputDir();
     tablePath = new Path(tablePath, table.getNamespaceAsString());
@@ -422,6 +457,30 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     return path;
   }
 
+  private Path getTargetDirForTable(TableName table) {
+    Path path = new Path(backupInfo.getBackupRootDir() + Path.SEPARATOR + 
backupInfo.getBackupId());
+    path = new Path(path, table.getNamespaceAsString());
+    path = new Path(path, table.getNameAsString());
+    return path;
+  }
+
+  private void setupRegionLocator() throws IOException {
+    Map<TableName, String> fullBackupIds = getFullBackupIds();
+    try (BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) {
+
+      for (TableName tableName : backupInfo.getTables()) {
+        String fullBackupId = fullBackupIds.get(tableName);
+        BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(fullBackupId);
+        String snapshotName = fullBackupInfo.getSnapshotName(tableName);
+        Path root = HBackupFileSystem.getTableBackupPath(tableName,
+          new Path(fullBackupInfo.getBackupRootDir()), fullBackupId);
+        String manifestDir =
+          SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, 
root).toString();
+        SnapshotRegionLocator.setSnapshotManifestDir(conf, manifestDir, 
tableName);
+      }
+    }
+  }
+
   private Map<TableName, String> getFullBackupIds() throws IOException {
     // Ancestors are stored from newest to oldest, so we can iterate backwards
     // in order to populate our backupId map with the most recent full backup
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
index 0c3c5b40ffb..4ba56f95125 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -59,6 +59,8 @@ public class RestoreTablesClient {
   private Path restoreRootDir;
   private boolean isOverwrite;
 
+  private boolean isKeepOriginalSplits;
+
   public RestoreTablesClient(Connection conn, RestoreRequest request) throws 
IOException {
     this.backupRootDir = request.getBackupRootDir();
     this.backupId = request.getBackupId();
@@ -68,6 +70,7 @@ public class RestoreTablesClient {
       this.tTableArray = sTableArray;
     }
     this.isOverwrite = request.isOverwrite();
+    this.isKeepOriginalSplits = request.isKeepOriginalSplits();
     this.conn = conn;
     this.conf = conn.getConfiguration();
     if (request.getRestoreRootDir() != null) {
@@ -132,7 +135,7 @@ public class RestoreTablesClient {
    */
 
   private void restoreImages(BackupImage[] images, TableName sTable, TableName 
tTable,
-    boolean truncateIfExists) throws IOException {
+    boolean truncateIfExists, boolean isKeepOriginalSplits) throws IOException 
{
     // First image MUST be image of a FULL backup
     BackupImage image = images[0];
     String rootDir = image.getRootDir();
@@ -148,7 +151,7 @@ public class RestoreTablesClient {
         + tableBackupPath.toString());
       conf.set(JOB_NAME_CONF_KEY, "Full_Restore-" + backupId + "-" + tTable);
       restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, 
truncateIfExists,
-        lastIncrBackupId);
+        isKeepOriginalSplits, lastIncrBackupId);
       conf.unset(JOB_NAME_CONF_KEY);
     } else { // incremental Backup
       throw new IOException("Unexpected backup type " + image.getType());
@@ -183,7 +186,7 @@ public class RestoreTablesClient {
     dirList.toArray(paths);
     conf.set(JOB_NAME_CONF_KEY, "Incremental_Restore-" + backupId + "-" + 
tTable);
     restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new 
TableName[] { sTable },
-      new TableName[] { tTable }, lastIncrBackupId);
+      new TableName[] { tTable }, lastIncrBackupId, isKeepOriginalSplits);
     LOG.info(sTable + " has been successfully restored to " + tTable);
   }
 
@@ -208,7 +211,7 @@ public class RestoreTablesClient {
    * @throws IOException exception
    */
   private void restore(BackupManifest manifest, TableName[] sTableArray, 
TableName[] tTableArray,
-    boolean isOverwrite) throws IOException {
+    boolean isOverwrite, boolean isKeepOriginalSplits) throws IOException {
     TreeSet<BackupImage> restoreImageSet = new TreeSet<>();
 
     for (int i = 0; i < sTableArray.length; i++) {
@@ -223,7 +226,7 @@ public class RestoreTablesClient {
       set.addAll(depList);
       BackupImage[] arr = new BackupImage[set.size()];
       set.toArray(arr);
-      restoreImages(arr, table, tTableArray[i], isOverwrite);
+      restoreImages(arr, table, tTableArray[i], isOverwrite, 
isKeepOriginalSplits);
       restoreImageSet.addAll(list);
       if (restoreImageSet != null && !restoreImageSet.isEmpty()) {
         LOG.info("Restore includes the following image(s):");
@@ -257,6 +260,6 @@ public class RestoreTablesClient {
     Path rootPath = new Path(backupRootDir);
     BackupManifest manifest = HBackupFileSystem.getManifest(conf, rootPath, 
backupId);
 
-    restore(manifest, sTableArray, tTableArray, isOverwrite);
+    restore(manifest, sTableArray, tTableArray, isOverwrite, 
isKeepOriginalSplits);
   }
 }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
index 755b0a41e32..28db0c605f7 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
 import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
 import org.apache.hadoop.io.NullWritable;
@@ -117,7 +118,7 @@ public class MapReduceHFileSplitterJob extends Configured 
implements Tool {
       job.setMapOutputValueClass(MapReduceExtendedCell.class);
       try (Connection conn = ConnectionFactory.createConnection(conf);
         Table table = conn.getTable(tableName);
-        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+        RegionLocator regionLocator = getRegionLocator(conf, conn, tableName)) 
{
         HFileOutputFormat2.configureIncrementalLoad(job, 
table.getDescriptor(), regionLocator);
       }
       LOG.debug("success configuring load incremental job");
@@ -170,4 +171,13 @@ public class MapReduceHFileSplitterJob extends Configured 
implements Tool {
     int result = job.waitForCompletion(true) ? 0 : 1;
     return result;
   }
+
+  private static RegionLocator getRegionLocator(Configuration conf, Connection 
conn,
+    TableName table) throws IOException {
+    if (SnapshotRegionLocator.shouldUseSnapshotRegionLocator(conf, table)) {
+      return SnapshotRegionLocator.create(conf, table);
+    }
+
+    return conn.getRegionLocator(table);
+  }
 }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreToOriginalSplitsJob.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreToOriginalSplitsJob.java
new file mode 100644
index 00000000000..942f69a2fb8
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreToOriginalSplitsJob.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.RestoreJob;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSVisitor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
[email protected]
+public class MapReduceRestoreToOriginalSplitsJob implements RestoreJob {
+  private Configuration conf;
+
+  @Override
+  public void run(Path[] dirPaths, TableName[] fromTables, Path restoreRootDir,
+    TableName[] toTables, boolean fullBackupRestore) throws IOException {
+    Configuration conf = getConf();
+
+    // We are using the files from the snapshot. We should copy them rather 
than move them over
+    conf.setBoolean(BulkLoadHFiles.ALWAYS_COPY_FILES, true);
+
+    FileSystem fs = FileSystem.get(conf);
+    Map<byte[], List<Path>> family2Files = buildFamily2Files(fs, dirPaths, 
fullBackupRestore);
+
+    BulkLoadHFiles bulkLoad = BulkLoadHFiles.create(conf);
+    for (int i = 0; i < fromTables.length; i++) {
+      bulkLoad.bulkLoad(toTables[i], family2Files);
+    }
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    this.conf = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  private static Map<byte[], List<Path>> buildFamily2Files(FileSystem fs, 
Path[] dirs,
+    boolean isFullBackup) throws IOException {
+    if (isFullBackup) {
+      return buildFullBackupFamily2Files(fs, dirs);
+    }
+
+    Map<byte[], List<Path>> family2Files = new HashMap<>();
+
+    for (Path dir : dirs) {
+      byte[] familyName = Bytes.toBytes(dir.getParent().getName());
+      if (family2Files.containsKey(familyName)) {
+        family2Files.get(familyName).add(dir);
+      } else {
+        family2Files.put(familyName, Lists.newArrayList(dir));
+      }
+    }
+
+    return family2Files;
+  }
+
+  private static Map<byte[], List<Path>> 
buildFullBackupFamily2Files(FileSystem fs, Path[] dirs)
+    throws IOException {
+    Map<byte[], List<Path>> family2Files = new HashMap<>();
+    for (Path regionPath : dirs) {
+      FSVisitor.visitRegionStoreFiles(fs, regionPath, (region, family, name) 
-> {
+        Path path = new Path(regionPath, new Path(family, name));
+        byte[] familyName = Bytes.toBytes(family);
+        if (family2Files.containsKey(familyName)) {
+          family2Files.get(familyName).add(path);
+        } else {
+          family2Files.put(familyName, Lists.newArrayList(path));
+        }
+      });
+    }
+    return family2Files;
+  }
+
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index 9f1ee261b69..bb3d80a5c1b 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -656,10 +656,17 @@ public final class BackupUtils {
    */
   public static RestoreRequest createRestoreRequest(String backupRootDir, 
String backupId,
     boolean check, TableName[] fromTables, TableName[] toTables, boolean 
isOverwrite) {
+    return createRestoreRequest(backupRootDir, backupId, check, fromTables, 
toTables, isOverwrite,
+      false);
+  }
+
+  public static RestoreRequest createRestoreRequest(String backupRootDir, 
String backupId,
+    boolean check, TableName[] fromTables, TableName[] toTables, boolean 
isOverwrite,
+    boolean isKeepOriginalSplits) {
     RestoreRequest.Builder builder = new RestoreRequest.Builder();
-    RestoreRequest request =
-      
builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check)
-        
.withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build();
+    RestoreRequest request = 
builder.withBackupRootDir(backupRootDir).withBackupId(backupId)
+      
.withCheck(check).withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite)
+      .withKeepOriginalSplits(isKeepOriginalSplits).build();
     return request;
   }
 
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
index ff4e2672f7a..6248d7932dd 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
@@ -143,16 +143,19 @@ public class RestoreTool {
    * During incremental backup operation. Call WalPlayer to replay WAL in 
backup image Currently
    * tableNames and newTablesNames only contain single table, will be expanded 
to multiple tables in
    * the future
-   * @param conn            HBase connection
-   * @param tableBackupPath backup path
-   * @param logDirs         : incremental backup folders, which contains WAL
-   * @param tableNames      : source tableNames(table names were backuped)
-   * @param newTableNames   : target tableNames(table names to be restored to)
-   * @param incrBackupId    incremental backup Id
+   * @param conn               HBase connection
+   * @param tableBackupPath    backup path
+   * @param logDirs            : incremental backup folders, which contains WAL
+   * @param tableNames         : source tableNames(table names were backuped)
+   * @param newTableNames      : target tableNames(table names to be restored 
to)
+   * @param incrBackupId       incremental backup Id
+   * @param keepOriginalSplits whether the original region splits from the 
full backup should be
+   *                           kept
    * @throws IOException exception
    */
   public void incrementalRestoreTable(Connection conn, Path tableBackupPath, 
Path[] logDirs,
-    TableName[] tableNames, TableName[] newTableNames, String incrBackupId) 
throws IOException {
+    TableName[] tableNames, TableName[] newTableNames, String incrBackupId,
+    boolean keepOriginalSplits) throws IOException {
     try (Admin admin = conn.getAdmin()) {
       if (tableNames.length != newTableNames.length) {
         throw new IOException("Number of source tables and target tables does 
not match!");
@@ -200,6 +203,7 @@ public class RestoreTool {
           LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + 
newTableDescriptor);
         }
       }
+      conf.setBoolean(RestoreJob.KEEP_ORIGINAL_SPLITS_KEY, keepOriginalSplits);
       RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
 
       restoreService.run(logDirs, tableNames, restoreRootDir, newTableNames, 
false);
@@ -207,9 +211,10 @@ public class RestoreTool {
   }
 
   public void fullRestoreTable(Connection conn, Path tableBackupPath, 
TableName tableName,
-    TableName newTableName, boolean truncateIfExists, String lastIncrBackupId) 
throws IOException {
+    TableName newTableName, boolean truncateIfExists, boolean 
isKeepOriginalSplits,
+    String lastIncrBackupId) throws IOException {
     createAndRestoreTable(conn, tableName, newTableName, tableBackupPath, 
truncateIfExists,
-      lastIncrBackupId);
+      isKeepOriginalSplits, lastIncrBackupId);
   }
 
   /**
@@ -283,7 +288,8 @@ public class RestoreTool {
   }
 
   private void createAndRestoreTable(Connection conn, TableName tableName, 
TableName newTableName,
-    Path tableBackupPath, boolean truncateIfExists, String lastIncrBackupId) 
throws IOException {
+    Path tableBackupPath, boolean truncateIfExists, boolean 
isKeepOriginalSplits,
+    String lastIncrBackupId) throws IOException {
     if (newTableName == null) {
       newTableName = tableName;
     }
@@ -349,6 +355,7 @@ public class RestoreTool {
       // should only try to create the table with all region informations, so 
we could pre-split
       // the regions in fine grain
       checkAndCreateTable(conn, newTableName, regionPathList, tableDescriptor, 
truncateIfExists);
+      conf.setBoolean(RestoreJob.KEEP_ORIGINAL_SPLITS_KEY, 
isKeepOriginalSplits);
       RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
       Path[] paths = new Path[regionPathList.size()];
       regionPathList.toArray(paths);
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
index fce64c276e6..2c434a124ac 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
@@ -18,15 +18,23 @@
 package org.apache.hadoop.hbase.backup;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
@@ -43,9 +51,14 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.LogRoller;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -69,6 +82,8 @@ public class TestIncrementalBackup extends TestBackupBase {
     HBaseClassTestRule.forClass(TestIncrementalBackup.class);
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestIncrementalBackup.class);
+  private static final byte[] BULKLOAD_START_KEY = new byte[] { 0x00 };
+  private static final byte[] BULKLOAD_END_KEY = new byte[] { Byte.MAX_VALUE };
 
   @Parameterized.Parameters
   public static Collection<Object[]> data() {
@@ -81,6 +96,34 @@ public class TestIncrementalBackup extends TestBackupBase {
   public TestIncrementalBackup(Boolean b) {
   }
 
+  @After
+  public void ensurePreviousBackupTestsAreCleanedUp() throws Exception {
+    TEST_UTIL.flush(table1);
+    TEST_UTIL.flush(table2);
+    TEST_UTIL.flush(table1_restore);
+
+    TEST_UTIL.truncateTable(table1).close();
+    TEST_UTIL.truncateTable(table2).close();
+    TEST_UTIL.truncateTable(table1_restore).close();
+
+    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(rst -> {
+      try {
+        LogRoller walRoller = rst.getRegionServer().getWalRoller();
+        walRoller.requestRollAll();
+        walRoller.waitUntilWalRollFinished();
+      } catch (Exception ignored) {
+      }
+    });
+
+    try (Table table = TEST_UTIL.getConnection().getTable(table1)) {
+      loadTable(table);
+    }
+
+    try (Table table = TEST_UTIL.getConnection().getTable(table2)) {
+      loadTable(table);
+    }
+  }
+
   // implement all test cases in 1 test since incremental
   // backup/restore has dependencies
   @Test
@@ -238,6 +281,101 @@ public class TestIncrementalBackup extends TestBackupBase 
{
     }
   }
 
+  @Test
+  public void TestIncBackupRestoreWithOriginalSplits() throws Exception {
+    byte[] mobFam = Bytes.toBytes("mob");
+
+    List<TableName> tables = Lists.newArrayList(table1);
+    TableDescriptor newTable1Desc =
+      
TableDescriptorBuilder.newBuilder(table1Desc).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(mobFam).setMobEnabled(true).setMobThreshold(5L).build()).build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    Connection conn = TEST_UTIL.getConnection();
+    BackupAdminImpl backupAdmin = new BackupAdminImpl(conn);
+    BackupRequest request = createBackupRequest(BackupType.FULL, tables, 
BACKUP_ROOT_DIR);
+    String fullBackupId = backupAdmin.backupTables(request);
+    assertTrue(checkSucceeded(fullBackupId));
+
+    TableName[] fromTables = new TableName[] { table1 };
+    TableName[] toTables = new TableName[] { table1_restore };
+
+    List<LocatedFileStatus> preRestoreBackupFiles = getBackupFiles();
+    backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
fullBackupId, false,
+      fromTables, toTables, true, true));
+    List<LocatedFileStatus> postRestoreBackupFiles = getBackupFiles();
+
+    // Check that the backup files are the same before and after the restore 
process
+    Assert.assertEquals(postRestoreBackupFiles, preRestoreBackupFiles);
+    Assert.assertEquals(TEST_UTIL.countRows(table1_restore), NB_ROWS_IN_BATCH);
+
+    int ROWS_TO_ADD = 1_000;
+    // different IDs so that rows don't overlap
+    insertIntoTable(conn, table1, famName, 3, ROWS_TO_ADD);
+    insertIntoTable(conn, table1, mobFam, 4, ROWS_TO_ADD);
+
+    try (Admin admin = conn.getAdmin()) {
+      List<HRegion> currentRegions = 
TEST_UTIL.getHBaseCluster().getRegions(table1);
+      for (HRegion region : currentRegions) {
+        byte[] name = region.getRegionInfo().getEncodedNameAsBytes();
+        admin.splitRegionAsync(name).get();
+      }
+
+      TEST_UTIL.waitTableAvailable(table1);
+
+      // Make sure we've split regions
+      assertNotEquals(currentRegions, 
TEST_UTIL.getHBaseCluster().getRegions(table1));
+
+      request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+      String incrementalBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(incrementalBackupId));
+      preRestoreBackupFiles = getBackupFiles();
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
incrementalBackupId,
+        false, fromTables, toTables, true, true));
+      postRestoreBackupFiles = getBackupFiles();
+      Assert.assertEquals(postRestoreBackupFiles, preRestoreBackupFiles);
+      Assert.assertEquals(NB_ROWS_IN_BATCH + ROWS_TO_ADD + ROWS_TO_ADD,
+        TEST_UTIL.countRows(table1_restore));
+
+      // test bulkloads
+      HRegion regionToBulkload = 
TEST_UTIL.getHBaseCluster().getRegions(table1).get(0);
+      String regionName = regionToBulkload.getRegionInfo().getEncodedName();
+
+      insertIntoTable(conn, table1, famName, 5, ROWS_TO_ADD);
+      insertIntoTable(conn, table1, mobFam, 6, ROWS_TO_ADD);
+
+      doBulkload(table1, regionName, famName, mobFam);
+
+      // we need to major compact the regions to make sure there are no 
references
+      // and the regions are once again splittable
+      TEST_UTIL.compact(true);
+      TEST_UTIL.flush();
+      TEST_UTIL.waitTableAvailable(table1);
+
+      for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(table1)) {
+        if (region.isSplittable()) {
+          
admin.splitRegionAsync(region.getRegionInfo().getEncodedNameAsBytes()).get();
+        }
+      }
+
+      request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+      incrementalBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(incrementalBackupId));
+
+      preRestoreBackupFiles = getBackupFiles();
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
incrementalBackupId,
+        false, fromTables, toTables, true, true));
+      postRestoreBackupFiles = getBackupFiles();
+
+      Assert.assertEquals(postRestoreBackupFiles, preRestoreBackupFiles);
+
+      int rowsExpected = TEST_UTIL.countRows(table1);
+      int rowsActual = TEST_UTIL.countRows(table1_restore);
+
+      Assert.assertEquals(rowsExpected, rowsActual);
+    }
+  }
+
   private void checkThrowsCFMismatch(IOException ex, List<TableName> tables) {
     Throwable cause = Throwables.getRootCause(ex);
     assertEquals(cause.getClass(), ColumnFamilyMismatchException.class);
@@ -253,6 +391,32 @@ public class TestIncrementalBackup extends TestBackupBase {
     return backupId;
   }
 
+  private static void doBulkload(TableName tn, String regionName, byte[]... 
fams)
+    throws IOException {
+    Path regionDir = createHFiles(tn, regionName, fams);
+    Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> results =
+      BulkLoadHFiles.create(conf1).bulkLoad(tn, regionDir);
+    assertFalse(results.isEmpty());
+  }
+
+  private static Path createHFiles(TableName tn, String regionName, byte[]... 
fams)
+    throws IOException {
+    Path rootdir = CommonFSUtils.getRootDir(conf1);
+    Path regionDir = CommonFSUtils.getRegionDir(rootdir, tn, regionName);
+
+    FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
+    fs.mkdirs(rootdir);
+
+    for (byte[] fam : fams) {
+      Path famDir = new Path(regionDir, Bytes.toString(fam));
+      Path hFileDir = new Path(famDir, UUID.randomUUID().toString());
+      HFileTestUtil.createHFile(conf1, fs, hFileDir, fam, qualName, 
BULKLOAD_START_KEY,
+        BULKLOAD_END_KEY, 1000);
+    }
+
+    return regionDir;
+  }
+
   /**
    * Check that backup manifest can be produced for a different root. Users 
may want to move
    * existing backups to a different location.
@@ -269,4 +433,16 @@ public class TestIncrementalBackup extends TestBackupBase {
       assertEquals(anotherRootDir, ancestor.getRootDir());
     }
   }
+
+  private List<LocatedFileStatus> getBackupFiles() throws IOException {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    RemoteIterator<LocatedFileStatus> iter = fs.listFiles(new 
Path(BACKUP_ROOT_DIR), true);
+    List<LocatedFileStatus> files = new ArrayList<>();
+
+    while (iter.hasNext()) {
+      files.add(iter.next());
+    }
+
+    return files;
+  }
 }
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 696e5257244..e06300848f6 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.TableInfo;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
@@ -379,7 +380,7 @@ public class WALPlayer extends Configured implements Tool {
         List<TableInfo> tableInfoList = new ArrayList<>();
         for (TableName tableName : tableNames) {
           Table table = conn.getTable(tableName);
-          RegionLocator regionLocator = conn.getRegionLocator(tableName);
+          RegionLocator regionLocator = getRegionLocator(tableName, conf, 
conn);
           tableInfoList.add(new TableInfo(table.getDescriptor(), 
regionLocator));
         }
         if (multiTableSupport) {
@@ -475,4 +476,13 @@ public class WALPlayer extends Configured implements Tool {
     Job job = createSubmittableJob(args);
     return job.waitForCompletion(true) ? 0 : 1;
   }
+
+  private static RegionLocator getRegionLocator(TableName tableName, 
Configuration conf,
+    Connection conn) throws IOException {
+    if (SnapshotRegionLocator.shouldUseSnapshotRegionLocator(conf, tableName)) 
{
+      return SnapshotRegionLocator.create(conf, tableName);
+    }
+
+    return conn.getRegionLocator(tableName);
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotRegionLocator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotRegionLocator.java
new file mode 100644
index 00000000000..5c90dd900f7
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotRegionLocator.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
+
+/**
+ * {@link RegionLocator} built using the most recent full backup's snapshot 
manifest for a given
+ * table. Useful for aligning any subsequent incremental backups along the 
same splits as the full
+ * backup
+ */
[email protected]
+public final class SnapshotRegionLocator implements RegionLocator {
+
+  private static final String SNAPSHOT_MANIFEST_DIR_PREFIX =
+    "region.locator.snapshot.manifest.dir.";
+
+  private static final ServerName FAKE_SERVER_NAME =
+    ServerName.parseServerName("www.example.net,1234,1212121212");
+
+  private final TableName tableName;
+  private final TreeMap<byte[], HRegionReplicas> regions;
+
+  private final List<HRegionLocation> rawLocations;
+
+  public static SnapshotRegionLocator create(Configuration conf, TableName 
table)
+    throws IOException {
+    Path workingDir = new Path(conf.get(getSnapshotManifestDirKey(table)));
+    FileSystem fs = workingDir.getFileSystem(conf);
+    SnapshotProtos.SnapshotDescription desc =
+      SnapshotDescriptionUtils.readSnapshotInfo(fs, workingDir);
+    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, workingDir, 
desc);
+
+    TableName tableName = manifest.getTableDescriptor().getTableName();
+    TreeMap<byte[], HRegionReplicas> replicas = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
+    List<HRegionLocation> rawLocations = new ArrayList<>();
+
+    for (SnapshotProtos.SnapshotRegionManifest region : 
manifest.getRegionManifests()) {
+      HBaseProtos.RegionInfo ri = region.getRegionInfo();
+      byte[] key = ri.getStartKey().toByteArray();
+
+      SnapshotHRegionLocation location = toLocation(ri, tableName);
+      rawLocations.add(location);
+      HRegionReplicas hrr = replicas.get(key);
+
+      if (hrr == null) {
+        hrr = new HRegionReplicas(location);
+      } else {
+        hrr.addReplica(location);
+      }
+
+      replicas.put(key, hrr);
+    }
+
+    return new SnapshotRegionLocator(tableName, replicas, rawLocations);
+  }
+
+  private SnapshotRegionLocator(TableName tableName, TreeMap<byte[], 
HRegionReplicas> regions,
+    List<HRegionLocation> rawLocations) {
+    this.tableName = tableName;
+    this.regions = regions;
+    this.rawLocations = rawLocations;
+  }
+
+  @Override
+  public HRegionLocation getRegionLocation(byte[] row, int replicaId, boolean 
reload)
+    throws IOException {
+    return regions.floorEntry(row).getValue().getReplica(replicaId);
+  }
+
+  @Override
+  public List<HRegionLocation> getRegionLocations(byte[] row, boolean reload) 
throws IOException {
+    return Collections.singletonList(getRegionLocation(row, reload));
+  }
+
+  @Override
+  public void clearRegionLocationCache() {
+
+  }
+
+  @Override
+  public List<HRegionLocation> getAllRegionLocations() throws IOException {
+    return rawLocations;
+  }
+
+  @Override
+  public TableName getName() {
+    return tableName;
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  public static boolean shouldUseSnapshotRegionLocator(Configuration conf, 
TableName table) {
+    return conf.get(getSnapshotManifestDirKey(table)) != null;
+  }
+
+  public static void setSnapshotManifestDir(Configuration conf, String dir, 
TableName table) {
+    conf.set(getSnapshotManifestDirKey(table), dir);
+  }
+
+  private static String getSnapshotManifestDirKey(TableName table) {
+    return SNAPSHOT_MANIFEST_DIR_PREFIX + 
table.getNameWithNamespaceInclAsString();
+  }
+
+  private static SnapshotHRegionLocation toLocation(HBaseProtos.RegionInfo ri,
+    TableName tableName) {
+    RegionInfo region = RegionInfoBuilder.newBuilder(tableName)
+      
.setStartKey(ri.getStartKey().toByteArray()).setEndKey(ri.getEndKey().toByteArray())
+      .setRegionId(ri.getRegionId()).setReplicaId(ri.getReplicaId()).build();
+
+    return new SnapshotHRegionLocation(region);
+  }
+
+  private static final class HRegionReplicas {
+    private final Map<Integer, SnapshotHRegionLocation> replicas = new 
HashMap<>();
+
+    private HRegionReplicas(SnapshotHRegionLocation region) {
+      addReplica(region);
+    }
+
+    private void addReplica(SnapshotHRegionLocation replica) {
+      this.replicas.put(replica.getRegion().getReplicaId(), replica);
+    }
+
+    private HRegionLocation getReplica(int replicaId) {
+      return replicas.get(replicaId);
+    }
+  }
+
+  public static final class SnapshotHRegionLocation extends HRegionLocation {
+
+    public SnapshotHRegionLocation(RegionInfo regionInfo) {
+      super(regionInfo, FAKE_SERVER_NAME);
+    }
+
+    @Override
+    public ServerName getServerName() {
+      throw new NotImplementedException("SnapshotHRegionLocation doesn't have 
a server name");
+    }
+
+    @Override
+    public String getHostname() {
+      throw new NotImplementedException("SnapshotHRegionLocation doesn't have 
a host name");
+    }
+
+    @Override
+    public int getPort() {
+      throw new NotImplementedException("SnapshotHRegionLocation doesn't have 
a port");
+    }
+
+    @Override
+    public int hashCode() {
+      return this.getRegion().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return super.equals(o);
+    }
+
+    @Override
+    public int compareTo(HRegionLocation o) {
+      return this.getRegion().compareTo(o.getRegion());
+    }
+  }
+}

Reply via email to