ndimiduk commented on code in PR #6370:
URL: https://github.com/apache/hbase/pull/6370#discussion_r1922215714


##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();

Review Comment:
   please use try-with-resource these two



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);

Review Comment:
   please use try-with-resource



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_FAM3);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+    // additionally, insert rows to MOB cf
+    int NB_ROWS_MOB = 111;
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
+    LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
+    t1.close();
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_MOB);
+    Table t2 = conn.getTable(table2);
+    Put p2;
+    for (int i = 0; i < 5; i++) {
+      p2 = new Put(Bytes.toBytes("row-t2" + i));
+      p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      t2.put(p2);
+    }
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtil.countRows(t2));
+    t2.close();
+    LOG.debug("written " + 5 + " rows to " + table2);
+    // split table1
+    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    List<HRegion> regions = cluster.getRegions(table1);
+    byte[] name = regions.get(0).getRegionInfo().getRegionName();
+    long startSplitTime = EnvironmentEdgeManager.currentTime();
+    try {
+      admin.splitRegionAsync(name).get();
+    } catch (Exception e) {
+      // although split fail, this may not affect following check in current 
API,
+      // exception will be thrown.
+      LOG.debug("region is not splittable, because " + e);
+    }
+    TEST_UTIL.waitTableAvailable(table1);
+    long endSplitTime = EnvironmentEdgeManager.currentTime();
+    // split finished
+    LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    BackupRequest request = createBackupRequest(BackupType.INCREMENTAL, 
tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
+    assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
+
+    // add column family f2 to table1
+    // drop column family f3
+    final byte[] fam2Name = Bytes.toBytes("f2");
+    newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
+      .build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    // check that an incremental backup fails because the CFs don't match
+    final List<TableName> tablesCopy = tables;
+    IOException ex = assertThrows(IOException.class, () -> client
+      .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
+    checkThrowsCFMismatch(ex, List.of(table1));
+    takeFullBackup(tables, client);
+
+    int NB_ROWS_FAM2 = 7;
+    Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+    t3.close();
+
+    // Wait for 5 sec to make sure that old WALs were deleted
+    Thread.sleep(5000);
+
+    // #4 - additional incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple2);
+
+    // #5 - restore full backup for all tables
+    TableName[] tablesRestoreFull = new TableName[] { table1, table2 };
+    TableName[] tablesMapFull = new TableName[] { table1_restore, 
table2_restore };
+
+    LOG.debug("Restoring full " + backupIdFull);
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdFull, false,
+      tablesRestoreFull, tablesMapFull, true));
+
+    // #6.1 - check tables for full restore
+    Admin hAdmin = TEST_UTIL.getAdmin();
+    assertTrue(hAdmin.tableExists(table1_restore));
+    assertTrue(hAdmin.tableExists(table2_restore));
+    hAdmin.close();
+
+    // #6.2 - checking row count of tables for full restore
+    Table hTable = conn.getTable(table1_restore);
+    Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH + 
NB_ROWS_FAM3);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);

Review Comment:
   please use try-with-resource



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_FAM3);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+    // additionally, insert rows to MOB cf
+    int NB_ROWS_MOB = 111;
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
+    LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
+    t1.close();
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_MOB);
+    Table t2 = conn.getTable(table2);
+    Put p2;
+    for (int i = 0; i < 5; i++) {
+      p2 = new Put(Bytes.toBytes("row-t2" + i));
+      p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      t2.put(p2);
+    }
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtil.countRows(t2));
+    t2.close();
+    LOG.debug("written " + 5 + " rows to " + table2);
+    // split table1
+    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    List<HRegion> regions = cluster.getRegions(table1);
+    byte[] name = regions.get(0).getRegionInfo().getRegionName();
+    long startSplitTime = EnvironmentEdgeManager.currentTime();
+    try {
+      admin.splitRegionAsync(name).get();
+    } catch (Exception e) {
+      // although split fail, this may not affect following check in current 
API,
+      // exception will be thrown.
+      LOG.debug("region is not splittable, because " + e);
+    }
+    TEST_UTIL.waitTableAvailable(table1);
+    long endSplitTime = EnvironmentEdgeManager.currentTime();
+    // split finished
+    LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    BackupRequest request = createBackupRequest(BackupType.INCREMENTAL, 
tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
+    assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
+
+    // add column family f2 to table1
+    // drop column family f3
+    final byte[] fam2Name = Bytes.toBytes("f2");
+    newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
+      .build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    // check that an incremental backup fails because the CFs don't match
+    final List<TableName> tablesCopy = tables;
+    IOException ex = assertThrows(IOException.class, () -> client
+      .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
+    checkThrowsCFMismatch(ex, List.of(table1));
+    takeFullBackup(tables, client);
+
+    int NB_ROWS_FAM2 = 7;
+    Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+    t3.close();
+
+    // Wait for 5 sec to make sure that old WALs were deleted
+    Thread.sleep(5000);
+
+    // #4 - additional incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple2);
+
+    // #5 - restore full backup for all tables
+    TableName[] tablesRestoreFull = new TableName[] { table1, table2 };
+    TableName[] tablesMapFull = new TableName[] { table1_restore, 
table2_restore };
+
+    LOG.debug("Restoring full " + backupIdFull);
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdFull, false,
+      tablesRestoreFull, tablesMapFull, true));
+
+    // #6.1 - check tables for full restore
+    Admin hAdmin = TEST_UTIL.getAdmin();
+    assertTrue(hAdmin.tableExists(table1_restore));
+    assertTrue(hAdmin.tableExists(table2_restore));
+    hAdmin.close();
+
+    // #6.2 - checking row count of tables for full restore
+    Table hTable = conn.getTable(table1_restore);
+    Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH + 
NB_ROWS_FAM3);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(NB_ROWS_IN_BATCH, HBaseTestingUtil.countRows(hTable));
+    hTable.close();
+
+    // #7 - restore incremental backup for multiple tables, with overwrite
+    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
+    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, 
table2_restore };
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdIncMultiple2, false,
+      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+    hTable = conn.getTable(table1_restore);
+
+    LOG.debug("After incremental restore: " + hTable.getDescriptor());
+    int countFamName = TEST_UTIL.countRows(hTable, famName);
+    LOG.debug("f1 has " + countFamName + " rows");
+    Assert.assertEquals(countFamName, NB_ROWS_IN_BATCH + ADD_ROWS);
+
+    int countFam2Name = TEST_UTIL.countRows(hTable, fam2Name);
+    LOG.debug("f2 has " + countFam2Name + " rows");
+    Assert.assertEquals(countFam2Name, NB_ROWS_FAM2);
+
+    int countMobName = TEST_UTIL.countRows(hTable, mobName);
+    LOG.debug("mob has " + countMobName + " rows");
+    Assert.assertEquals(countMobName, NB_ROWS_MOB);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);

Review Comment:
   please use try-with-resource



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_FAM3);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+    // additionally, insert rows to MOB cf
+    int NB_ROWS_MOB = 111;
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
+    LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
+    t1.close();
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_MOB);
+    Table t2 = conn.getTable(table2);
+    Put p2;
+    for (int i = 0; i < 5; i++) {
+      p2 = new Put(Bytes.toBytes("row-t2" + i));
+      p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      t2.put(p2);
+    }
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtil.countRows(t2));
+    t2.close();
+    LOG.debug("written " + 5 + " rows to " + table2);
+    // split table1
+    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    List<HRegion> regions = cluster.getRegions(table1);
+    byte[] name = regions.get(0).getRegionInfo().getRegionName();
+    long startSplitTime = EnvironmentEdgeManager.currentTime();
+    try {
+      admin.splitRegionAsync(name).get();
+    } catch (Exception e) {
+      // although split fail, this may not affect following check in current 
API,
+      // exception will be thrown.
+      LOG.debug("region is not splittable, because " + e);
+    }
+    TEST_UTIL.waitTableAvailable(table1);
+    long endSplitTime = EnvironmentEdgeManager.currentTime();
+    // split finished
+    LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    BackupRequest request = createBackupRequest(BackupType.INCREMENTAL, 
tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
+    assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
+
+    // add column family f2 to table1
+    // drop column family f3
+    final byte[] fam2Name = Bytes.toBytes("f2");
+    newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
+      .build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    // check that an incremental backup fails because the CFs don't match
+    final List<TableName> tablesCopy = tables;
+    IOException ex = assertThrows(IOException.class, () -> client
+      .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
+    checkThrowsCFMismatch(ex, List.of(table1));
+    takeFullBackup(tables, client);
+
+    int NB_ROWS_FAM2 = 7;
+    Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+    t3.close();
+
+    // Wait for 5 sec to make sure that old WALs were deleted
+    Thread.sleep(5000);
+
+    // #4 - additional incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple2);
+
+    // #5 - restore full backup for all tables
+    TableName[] tablesRestoreFull = new TableName[] { table1, table2 };
+    TableName[] tablesMapFull = new TableName[] { table1_restore, 
table2_restore };
+
+    LOG.debug("Restoring full " + backupIdFull);
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdFull, false,
+      tablesRestoreFull, tablesMapFull, true));
+
+    // #6.1 - check tables for full restore
+    Admin hAdmin = TEST_UTIL.getAdmin();
+    assertTrue(hAdmin.tableExists(table1_restore));
+    assertTrue(hAdmin.tableExists(table2_restore));
+    hAdmin.close();
+
+    // #6.2 - checking row count of tables for full restore
+    Table hTable = conn.getTable(table1_restore);
+    Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH + 
NB_ROWS_FAM3);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(NB_ROWS_IN_BATCH, HBaseTestingUtil.countRows(hTable));
+    hTable.close();
+
+    // #7 - restore incremental backup for multiple tables, with overwrite
+    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
+    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, 
table2_restore };
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdIncMultiple2, false,
+      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+    hTable = conn.getTable(table1_restore);

Review Comment:
   please use try-with-resource



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_FAM3);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+    // additionally, insert rows to MOB cf
+    int NB_ROWS_MOB = 111;
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
+    LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
+    t1.close();
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_MOB);
+    Table t2 = conn.getTable(table2);
+    Put p2;
+    for (int i = 0; i < 5; i++) {
+      p2 = new Put(Bytes.toBytes("row-t2" + i));
+      p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      t2.put(p2);
+    }
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtil.countRows(t2));
+    t2.close();
+    LOG.debug("written " + 5 + " rows to " + table2);
+    // split table1
+    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    List<HRegion> regions = cluster.getRegions(table1);
+    byte[] name = regions.get(0).getRegionInfo().getRegionName();
+    long startSplitTime = EnvironmentEdgeManager.currentTime();
+    try {
+      admin.splitRegionAsync(name).get();
+    } catch (Exception e) {
+      // although split fail, this may not affect following check in current 
API,
+      // exception will be thrown.
+      LOG.debug("region is not splittable, because " + e);
+    }
+    TEST_UTIL.waitTableAvailable(table1);
+    long endSplitTime = EnvironmentEdgeManager.currentTime();
+    // split finished
+    LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    BackupRequest request = createBackupRequest(BackupType.INCREMENTAL, 
tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
+    assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
+
+    // add column family f2 to table1
+    // drop column family f3
+    final byte[] fam2Name = Bytes.toBytes("f2");
+    newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
+      .build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    // check that an incremental backup fails because the CFs don't match
+    final List<TableName> tablesCopy = tables;
+    IOException ex = assertThrows(IOException.class, () -> client
+      .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
+    checkThrowsCFMismatch(ex, List.of(table1));
+    takeFullBackup(tables, client);
+
+    int NB_ROWS_FAM2 = 7;
+    Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+    t3.close();
+
+    // Wait for 5 sec to make sure that old WALs were deleted
+    Thread.sleep(5000);
+
+    // #4 - additional incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple2);
+
+    // #5 - restore full backup for all tables
+    TableName[] tablesRestoreFull = new TableName[] { table1, table2 };
+    TableName[] tablesMapFull = new TableName[] { table1_restore, 
table2_restore };
+
+    LOG.debug("Restoring full " + backupIdFull);
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdFull, false,
+      tablesRestoreFull, tablesMapFull, true));
+
+    // #6.1 - check tables for full restore
+    Admin hAdmin = TEST_UTIL.getAdmin();
+    assertTrue(hAdmin.tableExists(table1_restore));
+    assertTrue(hAdmin.tableExists(table2_restore));
+    hAdmin.close();
+
+    // #6.2 - checking row count of tables for full restore
+    Table hTable = conn.getTable(table1_restore);
+    Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH + 
NB_ROWS_FAM3);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(NB_ROWS_IN_BATCH, HBaseTestingUtil.countRows(hTable));
+    hTable.close();
+
+    // #7 - restore incremental backup for multiple tables, with overwrite
+    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
+    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, 
table2_restore };
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdIncMultiple2, false,
+      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+    hTable = conn.getTable(table1_restore);
+
+    LOG.debug("After incremental restore: " + hTable.getDescriptor());
+    int countFamName = TEST_UTIL.countRows(hTable, famName);
+    LOG.debug("f1 has " + countFamName + " rows");
+    Assert.assertEquals(countFamName, NB_ROWS_IN_BATCH + ADD_ROWS);
+
+    int countFam2Name = TEST_UTIL.countRows(hTable, fam2Name);
+    LOG.debug("f2 has " + countFam2Name + " rows");
+    Assert.assertEquals(countFam2Name, NB_ROWS_FAM2);
+
+    int countMobName = TEST_UTIL.countRows(hTable, mobName);
+    LOG.debug("mob has " + countMobName + " rows");
+    Assert.assertEquals(countMobName, NB_ROWS_MOB);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, 
HBaseTestingUtil.countRows(hTable));
+    hTable.close();
+    admin.close();
+  }
+
+  @Test
+  public void TestIncBackupRestoreWithOriginalSplits() throws Exception {
+    byte[] fam1 = Bytes.toBytes("f");
+    byte[] mobFam = Bytes.toBytes("mob");
+
+    List<TableName> tables = Lists.newArrayList(table1);
+    TableDescriptor newTable1Desc =
+      
TableDescriptorBuilder.newBuilder(table1Desc).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(mobFam).setMobEnabled(true).setMobThreshold(5L).build()).build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
     try (Connection conn = ConnectionFactory.createConnection(conf1)) {
-      int NB_ROWS_FAM3 = 6;
-      insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
-      insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
-      Admin admin = conn.getAdmin();
-      BackupAdminImpl client = new BackupAdminImpl(conn);
+      BackupAdminImpl backupAdmin = new BackupAdminImpl(conn);
       BackupRequest request = createBackupRequest(BackupType.FULL, tables, 
BACKUP_ROOT_DIR);
-      String backupIdFull = takeFullBackup(tables, client);
-      validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
-      assertTrue(checkSucceeded(backupIdFull));
-
-      // #2 - insert some data to table
-      Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
-      LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
-      Assert.assertEquals(HBaseTestingUtil.countRows(t1),
-        NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_FAM3);
-      LOG.debug("written " + ADD_ROWS + " rows to " + table1);
-      // additionally, insert rows to MOB cf
-      int NB_ROWS_MOB = 111;
-      insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
-      LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
-      t1.close();
-      Assert.assertEquals(HBaseTestingUtil.countRows(t1),
-        NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_MOB);
-      Table t2 = conn.getTable(table2);
-      Put p2;
-      for (int i = 0; i < 5; i++) {
-        p2 = new Put(Bytes.toBytes("row-t2" + i));
-        p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
-        t2.put(p2);
-      }
-      Assert.assertEquals(NB_ROWS_IN_BATCH + 5, 
HBaseTestingUtil.countRows(t2));
-      t2.close();
-      LOG.debug("written " + 5 + " rows to " + table2);
-      // split table1
-      SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-      List<HRegion> regions = cluster.getRegions(table1);
-      byte[] name = regions.get(0).getRegionInfo().getRegionName();
-      long startSplitTime = EnvironmentEdgeManager.currentTime();
-      try {
+      String fullBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(fullBackupId));
+
+      TableName[] fromTables = new TableName[] { table1 };
+      TableName[] toTables = new TableName[] { table1_restore };
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
fullBackupId, false,
+        fromTables, toTables, true, true));
+
+      Table table = conn.getTable(table1_restore);

Review Comment:
   please use try-with-resource



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_FAM3);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+    // additionally, insert rows to MOB cf
+    int NB_ROWS_MOB = 111;
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
+    LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
+    t1.close();
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_MOB);
+    Table t2 = conn.getTable(table2);
+    Put p2;
+    for (int i = 0; i < 5; i++) {
+      p2 = new Put(Bytes.toBytes("row-t2" + i));
+      p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      t2.put(p2);
+    }
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtil.countRows(t2));
+    t2.close();
+    LOG.debug("written " + 5 + " rows to " + table2);
+    // split table1
+    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    List<HRegion> regions = cluster.getRegions(table1);
+    byte[] name = regions.get(0).getRegionInfo().getRegionName();
+    long startSplitTime = EnvironmentEdgeManager.currentTime();
+    try {
+      admin.splitRegionAsync(name).get();
+    } catch (Exception e) {
+      // although split fail, this may not affect following check in current 
API,
+      // exception will be thrown.
+      LOG.debug("region is not splittable, because " + e);
+    }
+    TEST_UTIL.waitTableAvailable(table1);
+    long endSplitTime = EnvironmentEdgeManager.currentTime();
+    // split finished
+    LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    BackupRequest request = createBackupRequest(BackupType.INCREMENTAL, 
tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
+    assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
+
+    // add column family f2 to table1
+    // drop column family f3
+    final byte[] fam2Name = Bytes.toBytes("f2");
+    newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
+      .build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    // check that an incremental backup fails because the CFs don't match
+    final List<TableName> tablesCopy = tables;
+    IOException ex = assertThrows(IOException.class, () -> client
+      .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
+    checkThrowsCFMismatch(ex, List.of(table1));
+    takeFullBackup(tables, client);
+
+    int NB_ROWS_FAM2 = 7;
+    Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+    t3.close();
+
+    // Wait for 5 sec to make sure that old WALs were deleted
+    Thread.sleep(5000);
+
+    // #4 - additional incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple2);
+
+    // #5 - restore full backup for all tables
+    TableName[] tablesRestoreFull = new TableName[] { table1, table2 };
+    TableName[] tablesMapFull = new TableName[] { table1_restore, 
table2_restore };
+
+    LOG.debug("Restoring full " + backupIdFull);
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdFull, false,
+      tablesRestoreFull, tablesMapFull, true));
+
+    // #6.1 - check tables for full restore
+    Admin hAdmin = TEST_UTIL.getAdmin();
+    assertTrue(hAdmin.tableExists(table1_restore));
+    assertTrue(hAdmin.tableExists(table2_restore));
+    hAdmin.close();
+
+    // #6.2 - checking row count of tables for full restore
+    Table hTable = conn.getTable(table1_restore);
+    Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH + 
NB_ROWS_FAM3);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(NB_ROWS_IN_BATCH, HBaseTestingUtil.countRows(hTable));
+    hTable.close();
+
+    // #7 - restore incremental backup for multiple tables, with overwrite
+    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
+    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, 
table2_restore };
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdIncMultiple2, false,
+      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+    hTable = conn.getTable(table1_restore);
+
+    LOG.debug("After incremental restore: " + hTable.getDescriptor());
+    int countFamName = TEST_UTIL.countRows(hTable, famName);
+    LOG.debug("f1 has " + countFamName + " rows");
+    Assert.assertEquals(countFamName, NB_ROWS_IN_BATCH + ADD_ROWS);
+
+    int countFam2Name = TEST_UTIL.countRows(hTable, fam2Name);
+    LOG.debug("f2 has " + countFam2Name + " rows");
+    Assert.assertEquals(countFam2Name, NB_ROWS_FAM2);
+
+    int countMobName = TEST_UTIL.countRows(hTable, mobName);
+    LOG.debug("mob has " + countMobName + " rows");
+    Assert.assertEquals(countMobName, NB_ROWS_MOB);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, 
HBaseTestingUtil.countRows(hTable));
+    hTable.close();
+    admin.close();
+  }
+
+  @Test
+  public void TestIncBackupRestoreWithOriginalSplits() throws Exception {
+    byte[] fam1 = Bytes.toBytes("f");
+    byte[] mobFam = Bytes.toBytes("mob");
+
+    List<TableName> tables = Lists.newArrayList(table1);
+    TableDescriptor newTable1Desc =
+      
TableDescriptorBuilder.newBuilder(table1Desc).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(mobFam).setMobEnabled(true).setMobThreshold(5L).build()).build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
     try (Connection conn = ConnectionFactory.createConnection(conf1)) {
-      int NB_ROWS_FAM3 = 6;
-      insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
-      insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
-      Admin admin = conn.getAdmin();
-      BackupAdminImpl client = new BackupAdminImpl(conn);
+      BackupAdminImpl backupAdmin = new BackupAdminImpl(conn);
       BackupRequest request = createBackupRequest(BackupType.FULL, tables, 
BACKUP_ROOT_DIR);
-      String backupIdFull = takeFullBackup(tables, client);
-      validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
-      assertTrue(checkSucceeded(backupIdFull));
-
-      // #2 - insert some data to table
-      Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
-      LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
-      Assert.assertEquals(HBaseTestingUtil.countRows(t1),
-        NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_FAM3);
-      LOG.debug("written " + ADD_ROWS + " rows to " + table1);
-      // additionally, insert rows to MOB cf
-      int NB_ROWS_MOB = 111;
-      insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
-      LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
-      t1.close();
-      Assert.assertEquals(HBaseTestingUtil.countRows(t1),
-        NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_MOB);
-      Table t2 = conn.getTable(table2);
-      Put p2;
-      for (int i = 0; i < 5; i++) {
-        p2 = new Put(Bytes.toBytes("row-t2" + i));
-        p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
-        t2.put(p2);
-      }
-      Assert.assertEquals(NB_ROWS_IN_BATCH + 5, 
HBaseTestingUtil.countRows(t2));
-      t2.close();
-      LOG.debug("written " + 5 + " rows to " + table2);
-      // split table1
-      SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-      List<HRegion> regions = cluster.getRegions(table1);
-      byte[] name = regions.get(0).getRegionInfo().getRegionName();
-      long startSplitTime = EnvironmentEdgeManager.currentTime();
-      try {
+      String fullBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(fullBackupId));
+
+      TableName[] fromTables = new TableName[] { table1 };
+      TableName[] toTables = new TableName[] { table1_restore };
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
fullBackupId, false,
+        fromTables, toTables, true, true));
+
+      Table table = conn.getTable(table1_restore);
+      Assert.assertEquals(NB_ROWS_IN_BATCH, HBaseTestingUtil.countRows(table));
+
+      int ROWS_TO_ADD = 1_000;
+      // different IDs so that rows don't overlap
+      insertIntoTable(conn, table1, fam1, 3, ROWS_TO_ADD);
+      insertIntoTable(conn, table1, mobFam, 4, ROWS_TO_ADD);
+
+      Admin admin = conn.getAdmin();
+      List<HRegion> currentRegions = 
TEST_UTIL.getHBaseCluster().getRegions(table1);
+      for (HRegion region : currentRegions) {
+        byte[] name = region.getRegionInfo().getEncodedNameAsBytes();
         admin.splitRegionAsync(name).get();
-      } catch (Exception e) {
-        // although split fail, this may not affect following check in current 
API,
-        // exception will be thrown.
-        LOG.debug("region is not splittable, because " + e);
       }
+
       TEST_UTIL.waitTableAvailable(table1);
-      long endSplitTime = EnvironmentEdgeManager.currentTime();
-      // split finished
-      LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
 
-      // #3 - incremental backup for multiple tables
-      tables = Lists.newArrayList(table1, table2);
+      // Make sure we've split regions
+      assertNotEquals(currentRegions, 
TEST_UTIL.getHBaseCluster().getRegions(table1));
+
       request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
-      String backupIdIncMultiple = client.backupTables(request);
-      assertTrue(checkSucceeded(backupIdIncMultiple));
-      BackupManifest manifest =
-        HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
-      assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
-      validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
-
-      // add column family f2 to table1
-      // drop column family f3
-      final byte[] fam2Name = Bytes.toBytes("f2");
-      newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
-        
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
-        .build();
-      TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
-
-      // check that an incremental backup fails because the CFs don't match
-      final List<TableName> tablesCopy = tables;
-      IOException ex = assertThrows(IOException.class, () -> client
-        .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
-      checkThrowsCFMismatch(ex, List.of(table1));
-      takeFullBackup(tables, client);
-
-      int NB_ROWS_FAM2 = 7;
-      Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
-      t3.close();
-
-      // Wait for 5 sec to make sure that old WALs were deleted
-      Thread.sleep(5000);
-
-      // #4 - additional incremental backup for multiple tables
+      String incrementalBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(incrementalBackupId));
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
incrementalBackupId,
+        false, fromTables, toTables, true, true));
+      Assert.assertEquals(NB_ROWS_IN_BATCH + ROWS_TO_ADD + ROWS_TO_ADD,
+        HBaseTestingUtil.countRows(table));
+
+      // test bulkloads
+      HRegion regionToBulkload = 
TEST_UTIL.getHBaseCluster().getRegions(table1).get(0);
+      String regionName = regionToBulkload.getRegionInfo().getEncodedName();
+
+      insertIntoTable(conn, table1, fam1, 5, ROWS_TO_ADD);
+      insertIntoTable(conn, table1, mobFam, 6, ROWS_TO_ADD);
+
+      doBulkload(table1, regionName, famName, mobFam);
+
+      // we need to major compact the regions to make sure there are no 
references
+      // and the regions are once again splittable
+      TEST_UTIL.compact(true);
+      TEST_UTIL.flush();
+      TEST_UTIL.waitTableAvailable(table1);
+
+      for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(table1)) {
+        if (region.isSplittable()) {
+          
admin.splitRegionAsync(region.getRegionInfo().getEncodedNameAsBytes()).get();
+        }
+      }
+
       request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
-      String backupIdIncMultiple2 = client.backupTables(request);
-      assertTrue(checkSucceeded(backupIdIncMultiple2));
-      validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple2);
-
-      // #5 - restore full backup for all tables
-      TableName[] tablesRestoreFull = new TableName[] { table1, table2 };
-      TableName[] tablesMapFull = new TableName[] { table1_restore, 
table2_restore };
-
-      LOG.debug("Restoring full " + backupIdFull);
-      client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdFull, false,
-        tablesRestoreFull, tablesMapFull, true));
-
-      // #6.1 - check tables for full restore
-      Admin hAdmin = TEST_UTIL.getAdmin();
-      assertTrue(hAdmin.tableExists(table1_restore));
-      assertTrue(hAdmin.tableExists(table2_restore));
-      hAdmin.close();
-
-      // #6.2 - checking row count of tables for full restore
-      Table hTable = conn.getTable(table1_restore);
-      Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH 
+ NB_ROWS_FAM3);
-      hTable.close();
-
-      hTable = conn.getTable(table2_restore);
-      Assert.assertEquals(NB_ROWS_IN_BATCH, 
HBaseTestingUtil.countRows(hTable));
-      hTable.close();
-
-      // #7 - restore incremental backup for multiple tables, with overwrite
-      TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 
};
-      TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, 
table2_restore };
-      client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdIncMultiple2, false,
-        tablesRestoreIncMultiple, tablesMapIncMultiple, true));
-      hTable = conn.getTable(table1_restore);
-
-      LOG.debug("After incremental restore: " + hTable.getDescriptor());
-      int countFamName = TEST_UTIL.countRows(hTable, famName);
-      LOG.debug("f1 has " + countFamName + " rows");
-      Assert.assertEquals(countFamName, NB_ROWS_IN_BATCH + ADD_ROWS);
-
-      int countFam2Name = TEST_UTIL.countRows(hTable, fam2Name);
-      LOG.debug("f2 has " + countFam2Name + " rows");
-      Assert.assertEquals(countFam2Name, NB_ROWS_FAM2);
-
-      int countMobName = TEST_UTIL.countRows(hTable, mobName);
-      LOG.debug("mob has " + countMobName + " rows");
-      Assert.assertEquals(countMobName, NB_ROWS_MOB);
-      hTable.close();
-
-      hTable = conn.getTable(table2_restore);
-      Assert.assertEquals(NB_ROWS_IN_BATCH + 5, 
HBaseTestingUtil.countRows(hTable));
-      hTable.close();
+      incrementalBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(incrementalBackupId));
+
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
incrementalBackupId,
+        false, fromTables, toTables, true, true));
+
+      table = conn.getTable(table1);

Review Comment:
   please use try-with-resource
   
   Actually, what does this even do? You open the table object and close it 
immediately. If you're fishing for the absence of an exception, can you make 
that more clear?



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_FAM3);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+    // additionally, insert rows to MOB cf
+    int NB_ROWS_MOB = 111;
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
+    LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
+    t1.close();
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_MOB);
+    Table t2 = conn.getTable(table2);
+    Put p2;
+    for (int i = 0; i < 5; i++) {
+      p2 = new Put(Bytes.toBytes("row-t2" + i));
+      p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      t2.put(p2);
+    }
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtil.countRows(t2));
+    t2.close();
+    LOG.debug("written " + 5 + " rows to " + table2);
+    // split table1
+    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    List<HRegion> regions = cluster.getRegions(table1);
+    byte[] name = regions.get(0).getRegionInfo().getRegionName();
+    long startSplitTime = EnvironmentEdgeManager.currentTime();
+    try {
+      admin.splitRegionAsync(name).get();
+    } catch (Exception e) {
+      // although split fail, this may not affect following check in current 
API,
+      // exception will be thrown.
+      LOG.debug("region is not splittable, because " + e);
+    }
+    TEST_UTIL.waitTableAvailable(table1);
+    long endSplitTime = EnvironmentEdgeManager.currentTime();
+    // split finished
+    LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    BackupRequest request = createBackupRequest(BackupType.INCREMENTAL, 
tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
+    assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
+
+    // add column family f2 to table1
+    // drop column family f3
+    final byte[] fam2Name = Bytes.toBytes("f2");
+    newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
+      .build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    // check that an incremental backup fails because the CFs don't match
+    final List<TableName> tablesCopy = tables;
+    IOException ex = assertThrows(IOException.class, () -> client
+      .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
+    checkThrowsCFMismatch(ex, List.of(table1));
+    takeFullBackup(tables, client);
+
+    int NB_ROWS_FAM2 = 7;
+    Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+    t3.close();
+
+    // Wait for 5 sec to make sure that old WALs were deleted
+    Thread.sleep(5000);

Review Comment:
   Is there a side-effect that you can query to know when this is finished? 
It'll be more robust than a blind wait. We have 
`HBaseCommonTestingUtil#waitFor` and the underlying `Waiter` class offers a 
quite flexible API.



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -81,6 +94,34 @@ public static Collection<Object[]> data() {
   public TestIncrementalBackup(Boolean b) {
   }
 
+  @After
+  public void ensurePreviousBackupTestsAreCleanedUp() throws Exception {
+    TEST_UTIL.flush(table1);
+    TEST_UTIL.flush(table2);
+    TEST_UTIL.flush(table1_restore);
+
+    TEST_UTIL.truncateTable(table1).close();

Review Comment:
   please use try-with-resource these three.



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -81,6 +94,34 @@ public static Collection<Object[]> data() {
   public TestIncrementalBackup(Boolean b) {
   }
 
+  @After
+  public void ensurePreviousBackupTestsAreCleanedUp() throws Exception {
+    TEST_UTIL.flush(table1);
+    TEST_UTIL.flush(table2);
+    TEST_UTIL.flush(table1_restore);
+
+    TEST_UTIL.truncateTable(table1).close();
+    TEST_UTIL.truncateTable(table2).close();
+    TEST_UTIL.truncateTable(table1_restore).close();
+
+    TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(rst -> {
+      try {
+        LogRoller walRoller = rst.getRegionServer().getWalRoller();
+        walRoller.requestRollAll();
+        walRoller.waitUntilWalRollFinished();
+      } catch (Exception ignored) {
+      }
+    });
+
+    Table table = TEST_UTIL.getConnection().getTable(table1);

Review Comment:
   please use try-with-resource these two



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_FAM3);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+    // additionally, insert rows to MOB cf
+    int NB_ROWS_MOB = 111;
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
+    LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
+    t1.close();
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_MOB);
+    Table t2 = conn.getTable(table2);
+    Put p2;
+    for (int i = 0; i < 5; i++) {
+      p2 = new Put(Bytes.toBytes("row-t2" + i));
+      p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      t2.put(p2);
+    }
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtil.countRows(t2));
+    t2.close();
+    LOG.debug("written " + 5 + " rows to " + table2);
+    // split table1
+    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    List<HRegion> regions = cluster.getRegions(table1);
+    byte[] name = regions.get(0).getRegionInfo().getRegionName();
+    long startSplitTime = EnvironmentEdgeManager.currentTime();
+    try {
+      admin.splitRegionAsync(name).get();
+    } catch (Exception e) {
+      // although split fail, this may not affect following check in current 
API,
+      // exception will be thrown.
+      LOG.debug("region is not splittable, because " + e);
+    }
+    TEST_UTIL.waitTableAvailable(table1);
+    long endSplitTime = EnvironmentEdgeManager.currentTime();
+    // split finished
+    LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    BackupRequest request = createBackupRequest(BackupType.INCREMENTAL, 
tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
+    assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
+
+    // add column family f2 to table1
+    // drop column family f3
+    final byte[] fam2Name = Bytes.toBytes("f2");
+    newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
+      .build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    // check that an incremental backup fails because the CFs don't match
+    final List<TableName> tablesCopy = tables;
+    IOException ex = assertThrows(IOException.class, () -> client
+      .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
+    checkThrowsCFMismatch(ex, List.of(table1));
+    takeFullBackup(tables, client);
+
+    int NB_ROWS_FAM2 = 7;
+    Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);

Review Comment:
   please use try-with-resource



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_FAM3);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+    // additionally, insert rows to MOB cf
+    int NB_ROWS_MOB = 111;
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
+    LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
+    t1.close();
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_MOB);
+    Table t2 = conn.getTable(table2);

Review Comment:
   please use try-with-resource



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_FAM3);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+    // additionally, insert rows to MOB cf
+    int NB_ROWS_MOB = 111;
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
+    LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
+    t1.close();
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_MOB);
+    Table t2 = conn.getTable(table2);
+    Put p2;
+    for (int i = 0; i < 5; i++) {
+      p2 = new Put(Bytes.toBytes("row-t2" + i));
+      p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      t2.put(p2);
+    }
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtil.countRows(t2));
+    t2.close();
+    LOG.debug("written " + 5 + " rows to " + table2);
+    // split table1
+    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    List<HRegion> regions = cluster.getRegions(table1);
+    byte[] name = regions.get(0).getRegionInfo().getRegionName();
+    long startSplitTime = EnvironmentEdgeManager.currentTime();
+    try {
+      admin.splitRegionAsync(name).get();
+    } catch (Exception e) {
+      // although split fail, this may not affect following check in current 
API,
+      // exception will be thrown.
+      LOG.debug("region is not splittable, because " + e);
+    }
+    TEST_UTIL.waitTableAvailable(table1);
+    long endSplitTime = EnvironmentEdgeManager.currentTime();
+    // split finished
+    LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    BackupRequest request = createBackupRequest(BackupType.INCREMENTAL, 
tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
+    assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
+
+    // add column family f2 to table1
+    // drop column family f3
+    final byte[] fam2Name = Bytes.toBytes("f2");
+    newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
+      .build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    // check that an incremental backup fails because the CFs don't match
+    final List<TableName> tablesCopy = tables;
+    IOException ex = assertThrows(IOException.class, () -> client
+      .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
+    checkThrowsCFMismatch(ex, List.of(table1));
+    takeFullBackup(tables, client);
+
+    int NB_ROWS_FAM2 = 7;
+    Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+    t3.close();
+
+    // Wait for 5 sec to make sure that old WALs were deleted
+    Thread.sleep(5000);
+
+    // #4 - additional incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple2);
+
+    // #5 - restore full backup for all tables
+    TableName[] tablesRestoreFull = new TableName[] { table1, table2 };
+    TableName[] tablesMapFull = new TableName[] { table1_restore, 
table2_restore };
+
+    LOG.debug("Restoring full " + backupIdFull);
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdFull, false,
+      tablesRestoreFull, tablesMapFull, true));
+
+    // #6.1 - check tables for full restore
+    Admin hAdmin = TEST_UTIL.getAdmin();

Review Comment:
   The Javadoc on this method says that it is managed for you. Reading the 
code, I'm a bit surprised that closing it doesn't result errors later on.



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_FAM3);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+    // additionally, insert rows to MOB cf
+    int NB_ROWS_MOB = 111;
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
+    LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
+    t1.close();
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_MOB);
+    Table t2 = conn.getTable(table2);
+    Put p2;
+    for (int i = 0; i < 5; i++) {
+      p2 = new Put(Bytes.toBytes("row-t2" + i));
+      p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      t2.put(p2);
+    }
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtil.countRows(t2));
+    t2.close();
+    LOG.debug("written " + 5 + " rows to " + table2);
+    // split table1
+    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    List<HRegion> regions = cluster.getRegions(table1);
+    byte[] name = regions.get(0).getRegionInfo().getRegionName();
+    long startSplitTime = EnvironmentEdgeManager.currentTime();
+    try {
+      admin.splitRegionAsync(name).get();
+    } catch (Exception e) {
+      // although split fail, this may not affect following check in current 
API,
+      // exception will be thrown.
+      LOG.debug("region is not splittable, because " + e);
+    }
+    TEST_UTIL.waitTableAvailable(table1);
+    long endSplitTime = EnvironmentEdgeManager.currentTime();
+    // split finished
+    LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    BackupRequest request = createBackupRequest(BackupType.INCREMENTAL, 
tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
+    assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
+
+    // add column family f2 to table1
+    // drop column family f3
+    final byte[] fam2Name = Bytes.toBytes("f2");
+    newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
+      .build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    // check that an incremental backup fails because the CFs don't match
+    final List<TableName> tablesCopy = tables;
+    IOException ex = assertThrows(IOException.class, () -> client
+      .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
+    checkThrowsCFMismatch(ex, List.of(table1));
+    takeFullBackup(tables, client);
+
+    int NB_ROWS_FAM2 = 7;
+    Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+    t3.close();
+
+    // Wait for 5 sec to make sure that old WALs were deleted
+    Thread.sleep(5000);
+
+    // #4 - additional incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple2);
+
+    // #5 - restore full backup for all tables
+    TableName[] tablesRestoreFull = new TableName[] { table1, table2 };
+    TableName[] tablesMapFull = new TableName[] { table1_restore, 
table2_restore };
+
+    LOG.debug("Restoring full " + backupIdFull);
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdFull, false,
+      tablesRestoreFull, tablesMapFull, true));
+
+    // #6.1 - check tables for full restore
+    Admin hAdmin = TEST_UTIL.getAdmin();
+    assertTrue(hAdmin.tableExists(table1_restore));
+    assertTrue(hAdmin.tableExists(table2_restore));
+    hAdmin.close();
+
+    // #6.2 - checking row count of tables for full restore
+    Table hTable = conn.getTable(table1_restore);

Review Comment:
   please use try-with-resource



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_FAM3);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+    // additionally, insert rows to MOB cf
+    int NB_ROWS_MOB = 111;
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
+    LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
+    t1.close();
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_MOB);
+    Table t2 = conn.getTable(table2);
+    Put p2;
+    for (int i = 0; i < 5; i++) {
+      p2 = new Put(Bytes.toBytes("row-t2" + i));
+      p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      t2.put(p2);
+    }
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtil.countRows(t2));
+    t2.close();
+    LOG.debug("written " + 5 + " rows to " + table2);
+    // split table1
+    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    List<HRegion> regions = cluster.getRegions(table1);
+    byte[] name = regions.get(0).getRegionInfo().getRegionName();
+    long startSplitTime = EnvironmentEdgeManager.currentTime();
+    try {
+      admin.splitRegionAsync(name).get();
+    } catch (Exception e) {
+      // although split fail, this may not affect following check in current 
API,
+      // exception will be thrown.
+      LOG.debug("region is not splittable, because " + e);
+    }
+    TEST_UTIL.waitTableAvailable(table1);
+    long endSplitTime = EnvironmentEdgeManager.currentTime();
+    // split finished
+    LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    BackupRequest request = createBackupRequest(BackupType.INCREMENTAL, 
tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
+    assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
+
+    // add column family f2 to table1
+    // drop column family f3
+    final byte[] fam2Name = Bytes.toBytes("f2");
+    newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
+      .build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    // check that an incremental backup fails because the CFs don't match
+    final List<TableName> tablesCopy = tables;
+    IOException ex = assertThrows(IOException.class, () -> client
+      .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
+    checkThrowsCFMismatch(ex, List.of(table1));
+    takeFullBackup(tables, client);
+
+    int NB_ROWS_FAM2 = 7;
+    Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+    t3.close();
+
+    // Wait for 5 sec to make sure that old WALs were deleted
+    Thread.sleep(5000);
+
+    // #4 - additional incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple2);
+
+    // #5 - restore full backup for all tables
+    TableName[] tablesRestoreFull = new TableName[] { table1, table2 };
+    TableName[] tablesMapFull = new TableName[] { table1_restore, 
table2_restore };
+
+    LOG.debug("Restoring full " + backupIdFull);
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdFull, false,
+      tablesRestoreFull, tablesMapFull, true));
+
+    // #6.1 - check tables for full restore
+    Admin hAdmin = TEST_UTIL.getAdmin();
+    assertTrue(hAdmin.tableExists(table1_restore));
+    assertTrue(hAdmin.tableExists(table2_restore));
+    hAdmin.close();
+
+    // #6.2 - checking row count of tables for full restore
+    Table hTable = conn.getTable(table1_restore);
+    Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH + 
NB_ROWS_FAM3);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(NB_ROWS_IN_BATCH, HBaseTestingUtil.countRows(hTable));
+    hTable.close();
+
+    // #7 - restore incremental backup for multiple tables, with overwrite
+    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
+    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, 
table2_restore };
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdIncMultiple2, false,
+      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+    hTable = conn.getTable(table1_restore);
+
+    LOG.debug("After incremental restore: " + hTable.getDescriptor());
+    int countFamName = TEST_UTIL.countRows(hTable, famName);
+    LOG.debug("f1 has " + countFamName + " rows");
+    Assert.assertEquals(countFamName, NB_ROWS_IN_BATCH + ADD_ROWS);
+
+    int countFam2Name = TEST_UTIL.countRows(hTable, fam2Name);
+    LOG.debug("f2 has " + countFam2Name + " rows");
+    Assert.assertEquals(countFam2Name, NB_ROWS_FAM2);
+
+    int countMobName = TEST_UTIL.countRows(hTable, mobName);
+    LOG.debug("mob has " + countMobName + " rows");
+    Assert.assertEquals(countMobName, NB_ROWS_MOB);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, 
HBaseTestingUtil.countRows(hTable));
+    hTable.close();
+    admin.close();
+  }
+
+  @Test
+  public void TestIncBackupRestoreWithOriginalSplits() throws Exception {
+    byte[] fam1 = Bytes.toBytes("f");
+    byte[] mobFam = Bytes.toBytes("mob");
+
+    List<TableName> tables = Lists.newArrayList(table1);
+    TableDescriptor newTable1Desc =
+      
TableDescriptorBuilder.newBuilder(table1Desc).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(mobFam).setMobEnabled(true).setMobThreshold(5L).build()).build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
     try (Connection conn = ConnectionFactory.createConnection(conf1)) {
-      int NB_ROWS_FAM3 = 6;
-      insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
-      insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
-      Admin admin = conn.getAdmin();
-      BackupAdminImpl client = new BackupAdminImpl(conn);
+      BackupAdminImpl backupAdmin = new BackupAdminImpl(conn);
       BackupRequest request = createBackupRequest(BackupType.FULL, tables, 
BACKUP_ROOT_DIR);
-      String backupIdFull = takeFullBackup(tables, client);
-      validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
-      assertTrue(checkSucceeded(backupIdFull));
-
-      // #2 - insert some data to table
-      Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
-      LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
-      Assert.assertEquals(HBaseTestingUtil.countRows(t1),
-        NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_FAM3);
-      LOG.debug("written " + ADD_ROWS + " rows to " + table1);
-      // additionally, insert rows to MOB cf
-      int NB_ROWS_MOB = 111;
-      insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
-      LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
-      t1.close();
-      Assert.assertEquals(HBaseTestingUtil.countRows(t1),
-        NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_MOB);
-      Table t2 = conn.getTable(table2);
-      Put p2;
-      for (int i = 0; i < 5; i++) {
-        p2 = new Put(Bytes.toBytes("row-t2" + i));
-        p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
-        t2.put(p2);
-      }
-      Assert.assertEquals(NB_ROWS_IN_BATCH + 5, 
HBaseTestingUtil.countRows(t2));
-      t2.close();
-      LOG.debug("written " + 5 + " rows to " + table2);
-      // split table1
-      SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-      List<HRegion> regions = cluster.getRegions(table1);
-      byte[] name = regions.get(0).getRegionInfo().getRegionName();
-      long startSplitTime = EnvironmentEdgeManager.currentTime();
-      try {
+      String fullBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(fullBackupId));
+
+      TableName[] fromTables = new TableName[] { table1 };
+      TableName[] toTables = new TableName[] { table1_restore };
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
fullBackupId, false,
+        fromTables, toTables, true, true));
+
+      Table table = conn.getTable(table1_restore);
+      Assert.assertEquals(NB_ROWS_IN_BATCH, HBaseTestingUtil.countRows(table));
+
+      int ROWS_TO_ADD = 1_000;
+      // different IDs so that rows don't overlap
+      insertIntoTable(conn, table1, fam1, 3, ROWS_TO_ADD);
+      insertIntoTable(conn, table1, mobFam, 4, ROWS_TO_ADD);
+
+      Admin admin = conn.getAdmin();

Review Comment:
   When using the `Admin` accessed from the `Connection` instance, you need to 
close it. When using the `Admin` from the testing utility, you need to not 
close it.
   
   I know.



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_FAM3);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+    // additionally, insert rows to MOB cf
+    int NB_ROWS_MOB = 111;
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
+    LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
+    t1.close();
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_MOB);
+    Table t2 = conn.getTable(table2);
+    Put p2;
+    for (int i = 0; i < 5; i++) {
+      p2 = new Put(Bytes.toBytes("row-t2" + i));
+      p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      t2.put(p2);
+    }
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtil.countRows(t2));
+    t2.close();
+    LOG.debug("written " + 5 + " rows to " + table2);
+    // split table1
+    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    List<HRegion> regions = cluster.getRegions(table1);
+    byte[] name = regions.get(0).getRegionInfo().getRegionName();
+    long startSplitTime = EnvironmentEdgeManager.currentTime();
+    try {
+      admin.splitRegionAsync(name).get();
+    } catch (Exception e) {
+      // although split fail, this may not affect following check in current 
API,
+      // exception will be thrown.
+      LOG.debug("region is not splittable, because " + e);
+    }
+    TEST_UTIL.waitTableAvailable(table1);
+    long endSplitTime = EnvironmentEdgeManager.currentTime();
+    // split finished
+    LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    BackupRequest request = createBackupRequest(BackupType.INCREMENTAL, 
tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
+    assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
+
+    // add column family f2 to table1
+    // drop column family f3
+    final byte[] fam2Name = Bytes.toBytes("f2");
+    newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
+      .build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    // check that an incremental backup fails because the CFs don't match
+    final List<TableName> tablesCopy = tables;
+    IOException ex = assertThrows(IOException.class, () -> client
+      .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
+    checkThrowsCFMismatch(ex, List.of(table1));
+    takeFullBackup(tables, client);
+
+    int NB_ROWS_FAM2 = 7;
+    Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+    t3.close();
+
+    // Wait for 5 sec to make sure that old WALs were deleted
+    Thread.sleep(5000);
+
+    // #4 - additional incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple2);
+
+    // #5 - restore full backup for all tables
+    TableName[] tablesRestoreFull = new TableName[] { table1, table2 };
+    TableName[] tablesMapFull = new TableName[] { table1_restore, 
table2_restore };
+
+    LOG.debug("Restoring full " + backupIdFull);
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdFull, false,
+      tablesRestoreFull, tablesMapFull, true));
+
+    // #6.1 - check tables for full restore
+    Admin hAdmin = TEST_UTIL.getAdmin();
+    assertTrue(hAdmin.tableExists(table1_restore));
+    assertTrue(hAdmin.tableExists(table2_restore));
+    hAdmin.close();
+
+    // #6.2 - checking row count of tables for full restore
+    Table hTable = conn.getTable(table1_restore);
+    Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH + 
NB_ROWS_FAM3);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(NB_ROWS_IN_BATCH, HBaseTestingUtil.countRows(hTable));
+    hTable.close();
+
+    // #7 - restore incremental backup for multiple tables, with overwrite
+    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
+    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, 
table2_restore };
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdIncMultiple2, false,
+      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+    hTable = conn.getTable(table1_restore);
+
+    LOG.debug("After incremental restore: " + hTable.getDescriptor());
+    int countFamName = TEST_UTIL.countRows(hTable, famName);
+    LOG.debug("f1 has " + countFamName + " rows");
+    Assert.assertEquals(countFamName, NB_ROWS_IN_BATCH + ADD_ROWS);
+
+    int countFam2Name = TEST_UTIL.countRows(hTable, fam2Name);
+    LOG.debug("f2 has " + countFam2Name + " rows");
+    Assert.assertEquals(countFam2Name, NB_ROWS_FAM2);
+
+    int countMobName = TEST_UTIL.countRows(hTable, mobName);
+    LOG.debug("mob has " + countMobName + " rows");
+    Assert.assertEquals(countMobName, NB_ROWS_MOB);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, 
HBaseTestingUtil.countRows(hTable));
+    hTable.close();
+    admin.close();
+  }
+
+  @Test
+  public void TestIncBackupRestoreWithOriginalSplits() throws Exception {
+    byte[] fam1 = Bytes.toBytes("f");
+    byte[] mobFam = Bytes.toBytes("mob");
+
+    List<TableName> tables = Lists.newArrayList(table1);
+    TableDescriptor newTable1Desc =
+      
TableDescriptorBuilder.newBuilder(table1Desc).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(mobFam).setMobEnabled(true).setMobThreshold(5L).build()).build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
     try (Connection conn = ConnectionFactory.createConnection(conf1)) {
-      int NB_ROWS_FAM3 = 6;
-      insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
-      insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
-      Admin admin = conn.getAdmin();
-      BackupAdminImpl client = new BackupAdminImpl(conn);
+      BackupAdminImpl backupAdmin = new BackupAdminImpl(conn);
       BackupRequest request = createBackupRequest(BackupType.FULL, tables, 
BACKUP_ROOT_DIR);
-      String backupIdFull = takeFullBackup(tables, client);
-      validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
-      assertTrue(checkSucceeded(backupIdFull));
-
-      // #2 - insert some data to table
-      Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
-      LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
-      Assert.assertEquals(HBaseTestingUtil.countRows(t1),
-        NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_FAM3);
-      LOG.debug("written " + ADD_ROWS + " rows to " + table1);
-      // additionally, insert rows to MOB cf
-      int NB_ROWS_MOB = 111;
-      insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
-      LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
-      t1.close();
-      Assert.assertEquals(HBaseTestingUtil.countRows(t1),
-        NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_MOB);
-      Table t2 = conn.getTable(table2);
-      Put p2;
-      for (int i = 0; i < 5; i++) {
-        p2 = new Put(Bytes.toBytes("row-t2" + i));
-        p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
-        t2.put(p2);
-      }
-      Assert.assertEquals(NB_ROWS_IN_BATCH + 5, 
HBaseTestingUtil.countRows(t2));
-      t2.close();
-      LOG.debug("written " + 5 + " rows to " + table2);
-      // split table1
-      SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-      List<HRegion> regions = cluster.getRegions(table1);
-      byte[] name = regions.get(0).getRegionInfo().getRegionName();
-      long startSplitTime = EnvironmentEdgeManager.currentTime();
-      try {
+      String fullBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(fullBackupId));
+
+      TableName[] fromTables = new TableName[] { table1 };
+      TableName[] toTables = new TableName[] { table1_restore };
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
fullBackupId, false,
+        fromTables, toTables, true, true));
+
+      Table table = conn.getTable(table1_restore);
+      Assert.assertEquals(NB_ROWS_IN_BATCH, HBaseTestingUtil.countRows(table));
+
+      int ROWS_TO_ADD = 1_000;
+      // different IDs so that rows don't overlap
+      insertIntoTable(conn, table1, fam1, 3, ROWS_TO_ADD);

Review Comment:
   please use try-with-resource these two



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -235,6 +277,92 @@ public void TestIncBackupRestore() throws Exception {
     }
   }
 
+  @Test
+  public void TestIncBackupRestoreWithOriginalSplits() throws Exception {
+    byte[] fam1 = Bytes.toBytes("f");
+    byte[] mobFam = Bytes.toBytes("mob");
+
+    List<TableName> tables = Lists.newArrayList(table1);
+    TableDescriptor newTable1Desc =
+      
TableDescriptorBuilder.newBuilder(table1Desc).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(mobFam).setMobEnabled(true).setMobThreshold(5L).build()).build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    try (Connection conn = ConnectionFactory.createConnection(conf1)) {
+      BackupAdminImpl backupAdmin = new BackupAdminImpl(conn);
+      BackupRequest request = createBackupRequest(BackupType.FULL, tables, 
BACKUP_ROOT_DIR);
+      String fullBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(fullBackupId));
+
+      TableName[] fromTables = new TableName[] { table1 };
+      TableName[] toTables = new TableName[] { table1_restore };
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
fullBackupId, false,
+        fromTables, toTables, true, true));
+
+      Table table = conn.getTable(table1_restore);
+      Assert.assertEquals(HBaseTestingUtil.countRows(table), NB_ROWS_IN_BATCH);
+
+      int ROWS_TO_ADD = 1_000;
+      // different IDs so that rows don't overlap
+      insertIntoTable(conn, table1, fam1, 3, ROWS_TO_ADD);
+      insertIntoTable(conn, table1, mobFam, 4, ROWS_TO_ADD);
+
+      Admin admin = conn.getAdmin();
+      List<HRegion> currentRegions = 
TEST_UTIL.getHBaseCluster().getRegions(table1);
+      for (HRegion region : currentRegions) {
+        byte[] name = region.getRegionInfo().getEncodedNameAsBytes();
+        admin.splitRegionAsync(name).get();
+      }
+
+      TEST_UTIL.waitTableAvailable(table1);
+
+      // Make sure we've split regions
+      assertNotEquals(currentRegions, 
TEST_UTIL.getHBaseCluster().getRegions(table1));
+
+      request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+      String incrementalBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(incrementalBackupId));
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
incrementalBackupId,
+        false, fromTables, toTables, true, true));
+      Assert.assertEquals(HBaseTestingUtil.countRows(table),
+        NB_ROWS_IN_BATCH + ROWS_TO_ADD + ROWS_TO_ADD);
+
+      // test bulkloads
+      HRegion regionToBulkload = 
TEST_UTIL.getHBaseCluster().getRegions(table1).get(0);
+      String regionName = regionToBulkload.getRegionInfo().getEncodedName();
+
+      insertIntoTable(conn, table1, fam1, 5, ROWS_TO_ADD);
+      insertIntoTable(conn, table1, mobFam, 6, ROWS_TO_ADD);
+
+      doBulkload(table1, regionName, famName, mobFam);
+
+      // we need to major compact the regions to make sure there are no 
references
+      // and the regions are once again splittable
+      TEST_UTIL.compact(true);
+      TEST_UTIL.flush();
+      TEST_UTIL.waitTableAvailable(table1);
+
+      for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(table1)) {
+        if (region.isSplittable()) {
+          
admin.splitRegionAsync(region.getRegionInfo().getEncodedNameAsBytes()).get();
+        }
+      }
+
+      request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+      incrementalBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(incrementalBackupId));
+
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
incrementalBackupId,
+        false, fromTables, toTables, true, true));
+
+      table = conn.getTable(table1);
+      int rowsExpected = HBaseTestingUtil.countRows(table, famName, mobFam);
+      table = conn.getTable(table1_restore);
+
+      Assert.assertEquals(HBaseTestingUtil.countRows(table, famName, mobFam), 
rowsExpected);
+    }

Review Comment:
   It would be nice to assert explicitly between runs, or to have a test that 
specifically checks for this ... it's a pretty important invariant.



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -100,142 +141,227 @@ public void TestIncBackupRestore() throws Exception {
       .build();
     TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
 
+    Connection conn = TEST_UTIL.getConnection();
+    int NB_ROWS_FAM3 = 6;
+    insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
+    Admin admin = conn.getAdmin();
+    BackupAdminImpl client = new BackupAdminImpl(conn);
+    String backupIdFull = takeFullBackup(tables, client);
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
+    assertTrue(checkSucceeded(backupIdFull));
+
+    // #2 - insert some data to table
+    Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+    LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_FAM3);
+    LOG.debug("written " + ADD_ROWS + " rows to " + table1);
+    // additionally, insert rows to MOB cf
+    int NB_ROWS_MOB = 111;
+    insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
+    LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
+    t1.close();
+    Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + 
ADD_ROWS + NB_ROWS_MOB);
+    Table t2 = conn.getTable(table2);
+    Put p2;
+    for (int i = 0; i < 5; i++) {
+      p2 = new Put(Bytes.toBytes("row-t2" + i));
+      p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+      t2.put(p2);
+    }
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtil.countRows(t2));
+    t2.close();
+    LOG.debug("written " + 5 + " rows to " + table2);
+    // split table1
+    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+    List<HRegion> regions = cluster.getRegions(table1);
+    byte[] name = regions.get(0).getRegionInfo().getRegionName();
+    long startSplitTime = EnvironmentEdgeManager.currentTime();
+    try {
+      admin.splitRegionAsync(name).get();
+    } catch (Exception e) {
+      // although split fail, this may not affect following check in current 
API,
+      // exception will be thrown.
+      LOG.debug("region is not splittable, because " + e);
+    }
+    TEST_UTIL.waitTableAvailable(table1);
+    long endSplitTime = EnvironmentEdgeManager.currentTime();
+    // split finished
+    LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
+
+    // #3 - incremental backup for multiple tables
+    tables = Lists.newArrayList(table1, table2);
+    BackupRequest request = createBackupRequest(BackupType.INCREMENTAL, 
tables, BACKUP_ROOT_DIR);
+    String backupIdIncMultiple = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple));
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
+    assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
+
+    // add column family f2 to table1
+    // drop column family f3
+    final byte[] fam2Name = Bytes.toBytes("f2");
+    newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
+      .build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    // check that an incremental backup fails because the CFs don't match
+    final List<TableName> tablesCopy = tables;
+    IOException ex = assertThrows(IOException.class, () -> client
+      .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
+    checkThrowsCFMismatch(ex, List.of(table1));
+    takeFullBackup(tables, client);
+
+    int NB_ROWS_FAM2 = 7;
+    Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+    t3.close();
+
+    // Wait for 5 sec to make sure that old WALs were deleted
+    Thread.sleep(5000);
+
+    // #4 - additional incremental backup for multiple tables
+    request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+    String backupIdIncMultiple2 = client.backupTables(request);
+    assertTrue(checkSucceeded(backupIdIncMultiple2));
+    validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple2);
+
+    // #5 - restore full backup for all tables
+    TableName[] tablesRestoreFull = new TableName[] { table1, table2 };
+    TableName[] tablesMapFull = new TableName[] { table1_restore, 
table2_restore };
+
+    LOG.debug("Restoring full " + backupIdFull);
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdFull, false,
+      tablesRestoreFull, tablesMapFull, true));
+
+    // #6.1 - check tables for full restore
+    Admin hAdmin = TEST_UTIL.getAdmin();
+    assertTrue(hAdmin.tableExists(table1_restore));
+    assertTrue(hAdmin.tableExists(table2_restore));
+    hAdmin.close();
+
+    // #6.2 - checking row count of tables for full restore
+    Table hTable = conn.getTable(table1_restore);
+    Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH + 
NB_ROWS_FAM3);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(NB_ROWS_IN_BATCH, HBaseTestingUtil.countRows(hTable));
+    hTable.close();
+
+    // #7 - restore incremental backup for multiple tables, with overwrite
+    TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
+    TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, 
table2_restore };
+    client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdIncMultiple2, false,
+      tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+    hTable = conn.getTable(table1_restore);
+
+    LOG.debug("After incremental restore: " + hTable.getDescriptor());
+    int countFamName = TEST_UTIL.countRows(hTable, famName);
+    LOG.debug("f1 has " + countFamName + " rows");
+    Assert.assertEquals(countFamName, NB_ROWS_IN_BATCH + ADD_ROWS);
+
+    int countFam2Name = TEST_UTIL.countRows(hTable, fam2Name);
+    LOG.debug("f2 has " + countFam2Name + " rows");
+    Assert.assertEquals(countFam2Name, NB_ROWS_FAM2);
+
+    int countMobName = TEST_UTIL.countRows(hTable, mobName);
+    LOG.debug("mob has " + countMobName + " rows");
+    Assert.assertEquals(countMobName, NB_ROWS_MOB);
+    hTable.close();
+
+    hTable = conn.getTable(table2_restore);
+    Assert.assertEquals(NB_ROWS_IN_BATCH + 5, 
HBaseTestingUtil.countRows(hTable));
+    hTable.close();
+    admin.close();
+  }
+
+  @Test
+  public void TestIncBackupRestoreWithOriginalSplits() throws Exception {
+    byte[] fam1 = Bytes.toBytes("f");
+    byte[] mobFam = Bytes.toBytes("mob");
+
+    List<TableName> tables = Lists.newArrayList(table1);
+    TableDescriptor newTable1Desc =
+      
TableDescriptorBuilder.newBuilder(table1Desc).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(mobFam).setMobEnabled(true).setMobThreshold(5L).build()).build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
     try (Connection conn = ConnectionFactory.createConnection(conf1)) {
-      int NB_ROWS_FAM3 = 6;
-      insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close();
-      insertIntoTable(conn, table1, mobName, 3, NB_ROWS_FAM3).close();
-      Admin admin = conn.getAdmin();
-      BackupAdminImpl client = new BackupAdminImpl(conn);
+      BackupAdminImpl backupAdmin = new BackupAdminImpl(conn);
       BackupRequest request = createBackupRequest(BackupType.FULL, tables, 
BACKUP_ROOT_DIR);
-      String backupIdFull = takeFullBackup(tables, client);
-      validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdFull);
-      assertTrue(checkSucceeded(backupIdFull));
-
-      // #2 - insert some data to table
-      Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
-      LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
-      Assert.assertEquals(HBaseTestingUtil.countRows(t1),
-        NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_FAM3);
-      LOG.debug("written " + ADD_ROWS + " rows to " + table1);
-      // additionally, insert rows to MOB cf
-      int NB_ROWS_MOB = 111;
-      insertIntoTable(conn, table1, mobName, 3, NB_ROWS_MOB);
-      LOG.debug("written " + NB_ROWS_MOB + " rows to " + table1 + " to Mob 
enabled CF");
-      t1.close();
-      Assert.assertEquals(HBaseTestingUtil.countRows(t1),
-        NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_MOB);
-      Table t2 = conn.getTable(table2);
-      Put p2;
-      for (int i = 0; i < 5; i++) {
-        p2 = new Put(Bytes.toBytes("row-t2" + i));
-        p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
-        t2.put(p2);
-      }
-      Assert.assertEquals(NB_ROWS_IN_BATCH + 5, 
HBaseTestingUtil.countRows(t2));
-      t2.close();
-      LOG.debug("written " + 5 + " rows to " + table2);
-      // split table1
-      SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
-      List<HRegion> regions = cluster.getRegions(table1);
-      byte[] name = regions.get(0).getRegionInfo().getRegionName();
-      long startSplitTime = EnvironmentEdgeManager.currentTime();
-      try {
+      String fullBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(fullBackupId));
+
+      TableName[] fromTables = new TableName[] { table1 };
+      TableName[] toTables = new TableName[] { table1_restore };
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
fullBackupId, false,
+        fromTables, toTables, true, true));
+
+      Table table = conn.getTable(table1_restore);
+      Assert.assertEquals(NB_ROWS_IN_BATCH, HBaseTestingUtil.countRows(table));
+
+      int ROWS_TO_ADD = 1_000;
+      // different IDs so that rows don't overlap
+      insertIntoTable(conn, table1, fam1, 3, ROWS_TO_ADD);
+      insertIntoTable(conn, table1, mobFam, 4, ROWS_TO_ADD);
+
+      Admin admin = conn.getAdmin();
+      List<HRegion> currentRegions = 
TEST_UTIL.getHBaseCluster().getRegions(table1);
+      for (HRegion region : currentRegions) {
+        byte[] name = region.getRegionInfo().getEncodedNameAsBytes();
         admin.splitRegionAsync(name).get();
-      } catch (Exception e) {
-        // although split fail, this may not affect following check in current 
API,
-        // exception will be thrown.
-        LOG.debug("region is not splittable, because " + e);
       }
+
       TEST_UTIL.waitTableAvailable(table1);
-      long endSplitTime = EnvironmentEdgeManager.currentTime();
-      // split finished
-      LOG.debug("split finished in =" + (endSplitTime - startSplitTime));
 
-      // #3 - incremental backup for multiple tables
-      tables = Lists.newArrayList(table1, table2);
+      // Make sure we've split regions
+      assertNotEquals(currentRegions, 
TEST_UTIL.getHBaseCluster().getRegions(table1));
+
       request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
-      String backupIdIncMultiple = client.backupTables(request);
-      assertTrue(checkSucceeded(backupIdIncMultiple));
-      BackupManifest manifest =
-        HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR), 
backupIdIncMultiple);
-      assertEquals(Sets.newHashSet(table1, table2), new 
HashSet<>(manifest.getTableList()));
-      validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple);
-
-      // add column family f2 to table1
-      // drop column family f3
-      final byte[] fam2Name = Bytes.toBytes("f2");
-      newTable1Desc = TableDescriptorBuilder.newBuilder(newTable1Desc)
-        
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2Name)).removeColumnFamily(fam3Name)
-        .build();
-      TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
-
-      // check that an incremental backup fails because the CFs don't match
-      final List<TableName> tablesCopy = tables;
-      IOException ex = assertThrows(IOException.class, () -> client
-        .backupTables(createBackupRequest(BackupType.INCREMENTAL, tablesCopy, 
BACKUP_ROOT_DIR)));
-      checkThrowsCFMismatch(ex, List.of(table1));
-      takeFullBackup(tables, client);
-
-      int NB_ROWS_FAM2 = 7;
-      Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
-      t3.close();
-
-      // Wait for 5 sec to make sure that old WALs were deleted
-      Thread.sleep(5000);
-
-      // #4 - additional incremental backup for multiple tables
+      String incrementalBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(incrementalBackupId));
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
incrementalBackupId,
+        false, fromTables, toTables, true, true));
+      Assert.assertEquals(NB_ROWS_IN_BATCH + ROWS_TO_ADD + ROWS_TO_ADD,
+        HBaseTestingUtil.countRows(table));
+
+      // test bulkloads
+      HRegion regionToBulkload = 
TEST_UTIL.getHBaseCluster().getRegions(table1).get(0);
+      String regionName = regionToBulkload.getRegionInfo().getEncodedName();
+
+      insertIntoTable(conn, table1, fam1, 5, ROWS_TO_ADD);
+      insertIntoTable(conn, table1, mobFam, 6, ROWS_TO_ADD);
+
+      doBulkload(table1, regionName, famName, mobFam);
+
+      // we need to major compact the regions to make sure there are no 
references
+      // and the regions are once again splittable
+      TEST_UTIL.compact(true);
+      TEST_UTIL.flush();
+      TEST_UTIL.waitTableAvailable(table1);
+
+      for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(table1)) {
+        if (region.isSplittable()) {
+          
admin.splitRegionAsync(region.getRegionInfo().getEncodedNameAsBytes()).get();
+        }
+      }
+
       request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
-      String backupIdIncMultiple2 = client.backupTables(request);
-      assertTrue(checkSucceeded(backupIdIncMultiple2));
-      validateRootPathCanBeOverridden(BACKUP_ROOT_DIR, backupIdIncMultiple2);
-
-      // #5 - restore full backup for all tables
-      TableName[] tablesRestoreFull = new TableName[] { table1, table2 };
-      TableName[] tablesMapFull = new TableName[] { table1_restore, 
table2_restore };
-
-      LOG.debug("Restoring full " + backupIdFull);
-      client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdFull, false,
-        tablesRestoreFull, tablesMapFull, true));
-
-      // #6.1 - check tables for full restore
-      Admin hAdmin = TEST_UTIL.getAdmin();
-      assertTrue(hAdmin.tableExists(table1_restore));
-      assertTrue(hAdmin.tableExists(table2_restore));
-      hAdmin.close();
-
-      // #6.2 - checking row count of tables for full restore
-      Table hTable = conn.getTable(table1_restore);
-      Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH 
+ NB_ROWS_FAM3);
-      hTable.close();
-
-      hTable = conn.getTable(table2_restore);
-      Assert.assertEquals(NB_ROWS_IN_BATCH, 
HBaseTestingUtil.countRows(hTable));
-      hTable.close();
-
-      // #7 - restore incremental backup for multiple tables, with overwrite
-      TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 
};
-      TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, 
table2_restore };
-      client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backupIdIncMultiple2, false,
-        tablesRestoreIncMultiple, tablesMapIncMultiple, true));
-      hTable = conn.getTable(table1_restore);
-
-      LOG.debug("After incremental restore: " + hTable.getDescriptor());
-      int countFamName = TEST_UTIL.countRows(hTable, famName);
-      LOG.debug("f1 has " + countFamName + " rows");
-      Assert.assertEquals(countFamName, NB_ROWS_IN_BATCH + ADD_ROWS);
-
-      int countFam2Name = TEST_UTIL.countRows(hTable, fam2Name);
-      LOG.debug("f2 has " + countFam2Name + " rows");
-      Assert.assertEquals(countFam2Name, NB_ROWS_FAM2);
-
-      int countMobName = TEST_UTIL.countRows(hTable, mobName);
-      LOG.debug("mob has " + countMobName + " rows");
-      Assert.assertEquals(countMobName, NB_ROWS_MOB);
-      hTable.close();
-
-      hTable = conn.getTable(table2_restore);
-      Assert.assertEquals(NB_ROWS_IN_BATCH + 5, 
HBaseTestingUtil.countRows(hTable));
-      hTable.close();
+      incrementalBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(incrementalBackupId));
+
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
incrementalBackupId,
+        false, fromTables, toTables, true, true));
+
+      table = conn.getTable(table1);
+      table.close();
+      int rowsExpected = HBaseTestingUtil.countRows(table, famName, mobFam);
+      table = conn.getTable(table1_restore);

Review Comment:
   please use try-with-resource



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -235,6 +277,92 @@ public void TestIncBackupRestore() throws Exception {
     }
   }
 
+  @Test
+  public void TestIncBackupRestoreWithOriginalSplits() throws Exception {
+    byte[] fam1 = Bytes.toBytes("f");
+    byte[] mobFam = Bytes.toBytes("mob");
+
+    List<TableName> tables = Lists.newArrayList(table1);
+    TableDescriptor newTable1Desc =
+      
TableDescriptorBuilder.newBuilder(table1Desc).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(mobFam).setMobEnabled(true).setMobThreshold(5L).build()).build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    try (Connection conn = ConnectionFactory.createConnection(conf1)) {

Review Comment:
   Bump.



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -235,6 +277,92 @@ public void TestIncBackupRestore() throws Exception {
     }
   }
 
+  @Test
+  public void TestIncBackupRestoreWithOriginalSplits() throws Exception {
+    byte[] fam1 = Bytes.toBytes("f");
+    byte[] mobFam = Bytes.toBytes("mob");
+
+    List<TableName> tables = Lists.newArrayList(table1);
+    TableDescriptor newTable1Desc =
+      
TableDescriptorBuilder.newBuilder(table1Desc).setColumnFamily(ColumnFamilyDescriptorBuilder
+        
.newBuilder(mobFam).setMobEnabled(true).setMobThreshold(5L).build()).build();
+    TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+
+    try (Connection conn = ConnectionFactory.createConnection(conf1)) {
+      BackupAdminImpl backupAdmin = new BackupAdminImpl(conn);
+      BackupRequest request = createBackupRequest(BackupType.FULL, tables, 
BACKUP_ROOT_DIR);
+      String fullBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(fullBackupId));
+
+      TableName[] fromTables = new TableName[] { table1 };
+      TableName[] toTables = new TableName[] { table1_restore };
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
fullBackupId, false,
+        fromTables, toTables, true, true));
+
+      Table table = conn.getTable(table1_restore);
+      Assert.assertEquals(HBaseTestingUtil.countRows(table), NB_ROWS_IN_BATCH);
+
+      int ROWS_TO_ADD = 1_000;
+      // different IDs so that rows don't overlap
+      insertIntoTable(conn, table1, fam1, 3, ROWS_TO_ADD);
+      insertIntoTable(conn, table1, mobFam, 4, ROWS_TO_ADD);
+
+      Admin admin = conn.getAdmin();
+      List<HRegion> currentRegions = 
TEST_UTIL.getHBaseCluster().getRegions(table1);
+      for (HRegion region : currentRegions) {
+        byte[] name = region.getRegionInfo().getEncodedNameAsBytes();
+        admin.splitRegionAsync(name).get();
+      }
+
+      TEST_UTIL.waitTableAvailable(table1);
+
+      // Make sure we've split regions
+      assertNotEquals(currentRegions, 
TEST_UTIL.getHBaseCluster().getRegions(table1));
+
+      request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+      String incrementalBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(incrementalBackupId));
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
incrementalBackupId,
+        false, fromTables, toTables, true, true));
+      Assert.assertEquals(HBaseTestingUtil.countRows(table),
+        NB_ROWS_IN_BATCH + ROWS_TO_ADD + ROWS_TO_ADD);
+
+      // test bulkloads
+      HRegion regionToBulkload = 
TEST_UTIL.getHBaseCluster().getRegions(table1).get(0);
+      String regionName = regionToBulkload.getRegionInfo().getEncodedName();
+
+      insertIntoTable(conn, table1, fam1, 5, ROWS_TO_ADD);
+      insertIntoTable(conn, table1, mobFam, 6, ROWS_TO_ADD);
+
+      doBulkload(table1, regionName, famName, mobFam);
+
+      // we need to major compact the regions to make sure there are no 
references
+      // and the regions are once again splittable
+      TEST_UTIL.compact(true);
+      TEST_UTIL.flush();
+      TEST_UTIL.waitTableAvailable(table1);
+
+      for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(table1)) {
+        if (region.isSplittable()) {
+          
admin.splitRegionAsync(region.getRegionInfo().getEncodedNameAsBytes()).get();
+        }
+      }
+
+      request = createBackupRequest(BackupType.INCREMENTAL, tables, 
BACKUP_ROOT_DIR);
+      incrementalBackupId = backupAdmin.backupTables(request);
+      assertTrue(checkSucceeded(incrementalBackupId));
+
+      backupAdmin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
incrementalBackupId,
+        false, fromTables, toTables, true, true));
+
+      table = conn.getTable(table1);

Review Comment:
   bump



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java:
##########
@@ -249,4 +377,69 @@ private String takeFullBackup(List<TableName> tables, 
BackupAdminImpl backupAdmi
     checkSucceeded(backupId);
     return backupId;
   }
+
+  private static void doBulkload(TableName tn, String regionName, byte[]... 
fams)

Review Comment:
   Were you going to move/merge/refactor these methods as suggested?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to