This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 334accf4ff6 HBASE-28767 Simplify backup bulk-loading code (#6134)
334accf4ff6 is described below

commit 334accf4ff6472d6f3058ab89da499edab5fde73
Author: DieterDP <[email protected]>
AuthorDate: Fri Dec 13 14:43:13 2024 +0100

    HBASE-28767 Simplify backup bulk-loading code (#6134)
    
    Signed-off-by: Nick Dimiduk <[email protected]>
---
 .../hadoop/hbase/backup/BackupHFileCleaner.java    |   3 +-
 .../apache/hadoop/hbase/backup/BackupObserver.java |  48 +++----
 .../hadoop/hbase/backup/impl/BackupManager.java    |   4 +-
 .../hbase/backup/impl/BackupSystemTable.java       | 153 +++++----------------
 .../apache/hadoop/hbase/backup/impl/BulkLoad.java  |  93 +++++++++++++
 .../backup/impl/IncrementalTableBackupClient.java  | 112 ++++++---------
 6 files changed, 197 insertions(+), 216 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
index 619cecaeaaa..d7474997412 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.backup;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -103,7 +104,7 @@ public class BackupHFileCleaner extends 
BaseHFileCleanerDelegate implements Abor
       }
 
       try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
-        fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
+        fullyBackedUpTables = new 
ArrayList<>(tbl.getTablesIncludedInBackups());
       } catch (IOException ioe) {
         LOG.error("Failed to get tables which have been fully backed up, 
skipping checking", ioe);
         return Collections.emptyList();
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
index 73f97365adb..db8c29c4c0a 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
@@ -18,9 +18,12 @@
 package org.apache.hadoop.hbase.backup;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -64,21 +67,8 @@ public class BackupObserver implements RegionCoprocessor, 
RegionObserver {
       LOG.debug("skipping recording bulk load in postBulkLoadHFile since 
backup is disabled");
       return;
     }
-    try (Connection connection = ConnectionFactory.createConnection(cfg);
-      BackupSystemTable tbl = new BackupSystemTable(connection)) {
-      List<TableName> fullyBackedUpTables = 
tbl.getTablesForBackupType(BackupType.FULL);
-      RegionInfo info = ctx.getEnvironment().getRegionInfo();
-      TableName tableName = info.getTable();
-      if (!fullyBackedUpTables.contains(tableName)) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(tableName + " has not gone thru full backup");
-        }
-        return;
-      }
-      tbl.writePathsPostBulkLoad(tableName, info.getEncodedNameAsBytes(), 
finalPaths);
-    } catch (IOException ioe) {
-      LOG.error("Failed to get tables which have been fully backed up", ioe);
-    }
+
+    registerBulkLoad(ctx, finalPaths);
   }
 
   @Override
@@ -89,19 +79,31 @@ public class BackupObserver implements RegionCoprocessor, 
RegionObserver {
       LOG.debug("skipping recording bulk load in preCommitStoreFile since 
backup is disabled");
       return;
     }
+
+    List<Path> hfiles = new ArrayList<>(pairs.size());
+    for (Pair<Path, Path> pair : pairs) {
+      hfiles.add(pair.getSecond());
+    }
+    registerBulkLoad(ctx, Collections.singletonMap(family, hfiles));
+  }
+
+  private void registerBulkLoad(ObserverContext<? extends 
RegionCoprocessorEnvironment> ctx,
+    Map<byte[], List<Path>> cfToHFilePaths) throws IOException {
+    Configuration cfg = ctx.getEnvironment().getConfiguration();
+    RegionInfo region = ctx.getEnvironment().getRegionInfo();
+    TableName tableName = region.getTable();
+
     try (Connection connection = ConnectionFactory.createConnection(cfg);
       BackupSystemTable tbl = new BackupSystemTable(connection)) {
-      List<TableName> fullyBackedUpTables = 
tbl.getTablesForBackupType(BackupType.FULL);
-      RegionInfo info = ctx.getEnvironment().getRegionInfo();
-      TableName tableName = info.getTable();
-      if (!fullyBackedUpTables.contains(tableName)) {
+      Set<TableName> fullyBackedUpTables = tbl.getTablesIncludedInBackups();
+
+      if (fullyBackedUpTables.contains(tableName)) {
+        tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(), 
cfToHFilePaths);
+      } else {
         if (LOG.isTraceEnabled()) {
-          LOG.trace(tableName + " has not gone thru full backup");
+          LOG.trace("Table {} has not gone through full backup - skipping.", 
tableName);
         }
-        return;
       }
-      tbl.writeFilesForBulkLoadPreCommit(tableName, 
info.getEncodedNameAsBytes(), family, pairs);
-      return;
     }
   }
 }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index e3644c7d7f0..5afd580a649 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -355,8 +354,7 @@ public class BackupManager implements Closeable {
     return 
systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
   }
 
-  public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, 
Boolean>>>>>, List<byte[]>>
-    readBulkloadRows(List<TableName> tableList) throws IOException {
+  public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws 
IOException {
     return systemTable.readBulkloadRows(tableList);
   }
 
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 01106b4cd0e..203f3f61b0f 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -179,10 +178,6 @@ public final class BackupSystemTable implements Closeable {
   final static byte[] TBL_COL = Bytes.toBytes("tbl");
   final static byte[] FAM_COL = Bytes.toBytes("fam");
   final static byte[] PATH_COL = Bytes.toBytes("path");
-  final static byte[] STATE_COL = Bytes.toBytes("state");
-  // the two states a bulk loaded file can be
-  final static byte[] BL_PREPARE = Bytes.toBytes("R");
-  final static byte[] BL_COMMIT = Bytes.toBytes("D");
 
   private final static String SET_KEY_PREFIX = "backupset:";
 
@@ -378,7 +373,7 @@ public final class BackupSystemTable implements Closeable {
         }
         files.add(new Path(path));
         if (LOG.isDebugEnabled()) {
-          LOG.debug("found bulk loaded file : " + tbl + " " + 
Bytes.toString(fam) + " " + path);
+          LOG.debug("found bulk loaded file : {} {} {}", tbl, 
Bytes.toString(fam), path);
         }
       }
 
@@ -401,43 +396,22 @@ public final class BackupSystemTable implements Closeable 
{
     }
   }
 
-  /*
-   * For postBulkLoadHFile() hook.
-   * @param tabName table name
-   * @param region the region receiving hfile
-   * @param finalPaths family and associated hfiles
+  /**
+   * Registers a bulk load.
+   * @param tableName     table name
+   * @param region        the region receiving hfile
+   * @param cfToHfilePath column family and associated hfiles
    */
-  public void writePathsPostBulkLoad(TableName tabName, byte[] region,
-    Map<byte[], List<Path>> finalPaths) throws IOException {
+  public void registerBulkLoad(TableName tableName, byte[] region,
+    Map<byte[], List<Path>> cfToHfilePath) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("write bulk load descriptor to backup " + tabName + " with " + 
finalPaths.size()
-        + " entries");
+      LOG.debug("Writing bulk load descriptor to backup {} with {} entries", 
tableName,
+        cfToHfilePath.size());
     }
     try (BufferedMutator bufferedMutator = 
connection.getBufferedMutator(bulkLoadTableName)) {
-      List<Put> puts = 
BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
+      List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, 
region, cfToHfilePath);
       bufferedMutator.mutate(puts);
-      LOG.debug("written " + puts.size() + " rows for bulk load of " + 
tabName);
-    }
-  }
-
-  /*
-   * For preCommitStoreFile() hook
-   * @param tabName table name
-   * @param region the region receiving hfile
-   * @param family column family
-   * @param pairs list of paths for hfiles
-   */
-  public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, 
final byte[] family,
-    final List<Pair<Path, Path>> pairs) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(
-        "write bulk load descriptor to backup " + tabName + " with " + 
pairs.size() + " entries");
-    }
-    try (Table table = connection.getTable(bulkLoadTableName)) {
-      List<Put> puts =
-        BackupSystemTable.createPutForPreparedBulkload(tabName, region, 
family, pairs);
-      table.put(puts);
-      LOG.debug("written " + puts.size() + " rows for bulk load of " + 
tabName);
+      LOG.debug("Written {} rows for bulk load of {}", puts.size(), tableName);
     }
   }
 
@@ -459,33 +433,25 @@ public final class BackupSystemTable implements Closeable 
{
     }
   }
 
-  /*
+  /**
    * Reads the rows from backup table recording bulk loaded hfiles
    * @param tableList list of table names
-   * @return The keys of the Map are table, region and column family. Value of 
the map reflects
-   * whether the hfile was recorded by preCommitStoreFile hook (true)
-   */
-  public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, 
Boolean>>>>>, List<byte[]>>
-    readBulkloadRows(List<TableName> tableList) throws IOException {
-
-    Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map 
= new HashMap<>();
-    List<byte[]> rows = new ArrayList<>();
-    for (TableName tTable : tableList) {
-      Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
-      Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = 
map.get(tTable);
-      try (Table table = connection.getTable(bulkLoadTableName);
-        ResultScanner scanner = table.getScanner(scan)) {
-        Result res = null;
+   */
+  public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws 
IOException {
+    List<BulkLoad> result = new ArrayList<>();
+    for (TableName table : tableList) {
+      Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
+      try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
+        ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
+        Result res;
         while ((res = scanner.next()) != null) {
           res.advance();
           String fam = null;
           String path = null;
-          boolean raw = false;
-          byte[] row;
           String region = null;
+          byte[] row = null;
           for (Cell cell : res.listCells()) {
             row = CellUtil.cloneRow(cell);
-            rows.add(row);
             String rowStr = Bytes.toString(row);
             region = 
BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
             if (
@@ -498,35 +464,14 @@ public final class BackupSystemTable implements Closeable 
{
                 BackupSystemTable.PATH_COL.length) == 0
             ) {
               path = Bytes.toString(CellUtil.cloneValue(cell));
-            } else if (
-              CellUtil.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
-                BackupSystemTable.STATE_COL.length) == 0
-            ) {
-              byte[] state = CellUtil.cloneValue(cell);
-              if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
-                raw = true;
-              } else {
-                raw = false;
-              }
             }
           }
-          if (map.get(tTable) == null) {
-            map.put(tTable, new HashMap<>());
-            tblMap = map.get(tTable);
-          }
-          if (tblMap.get(region) == null) {
-            tblMap.put(region, new HashMap<>());
-          }
-          Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
-          if (famMap.get(fam) == null) {
-            famMap.put(fam, new ArrayList<>());
-          }
-          famMap.get(fam).add(new Pair<>(path, raw));
+          result.add(new BulkLoad(table, region, fam, path, row));
           LOG.debug("found orig " + path + " for " + fam + " of table " + 
region);
         }
       }
     }
-    return new Pair<>(map, rows);
+    return result;
   }
 
   /*
@@ -793,20 +738,19 @@ public final class BackupSystemTable implements Closeable 
{
     return result;
   }
 
-  /*
-   * Retrieve TableName's for completed backup of given type
-   * @param type backup type
-   * @return List of table names
+  /**
+   * Retrieve all table names that are part of any known backup
    */
-  public List<TableName> getTablesForBackupType(BackupType type) throws 
IOException {
+  public Set<TableName> getTablesIncludedInBackups() throws IOException {
     Set<TableName> names = new HashSet<>();
     List<BackupInfo> infos = getBackupHistory(true);
     for (BackupInfo info : infos) {
-      if (info.getType() == type) {
+      // Incremental backups have the same tables as the preceding full backups
+      if (info.getType() == BackupType.FULL) {
         names.addAll(info.getTableNames());
       }
     }
-    return new ArrayList<>(names);
+    return names;
   }
 
   /**
@@ -1500,13 +1444,13 @@ public final class BackupSystemTable implements 
Closeable {
     return s.substring(index + 1);
   }
 
-  /*
-   * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
+  /**
+   * Creates Put's for bulk loads.
    */
-  static List<Put> createPutForCommittedBulkload(TableName table, byte[] 
region,
-    Map<byte[], List<Path>> finalPaths) {
+  private static List<Put> createPutForBulkLoad(TableName table, byte[] region,
+    Map<byte[], List<Path>> columnFamilyToHFilePaths) {
     List<Put> puts = new ArrayList<>();
-    for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
+    for (Map.Entry<byte[], List<Path>> entry : 
columnFamilyToHFilePaths.entrySet()) {
       for (Path path : entry.getValue()) {
         String file = path.toString();
         int lastSlash = file.lastIndexOf("/");
@@ -1516,10 +1460,8 @@ public final class BackupSystemTable implements 
Closeable {
         put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
         put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
         put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, 
Bytes.toBytes(file));
-        put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
         puts.add(put);
-        LOG
-          .debug("writing done bulk path " + file + " for " + table + " " + 
Bytes.toString(region));
+        LOG.debug("Done writing bulk path {} for {} {}", file, table, 
Bytes.toString(region));
       }
     }
     return puts;
@@ -1580,29 +1522,6 @@ public final class BackupSystemTable implements 
Closeable {
     }
   }
 
-  /*
-   * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
-   */
-  static List<Put> createPutForPreparedBulkload(TableName table, byte[] 
region, final byte[] family,
-    final List<Pair<Path, Path>> pairs) {
-    List<Put> puts = new ArrayList<>(pairs.size());
-    for (Pair<Path, Path> pair : pairs) {
-      Path path = pair.getSecond();
-      String file = path.toString();
-      int lastSlash = file.lastIndexOf("/");
-      String filename = file.substring(lastSlash + 1);
-      Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), 
BLK_LD_DELIM,
-        Bytes.toString(region), BLK_LD_DELIM, filename));
-      put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
-      put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
-      put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, 
Bytes.toBytes(file));
-      put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
-      puts.add(put);
-      LOG.debug("writing raw bulk path " + file + " for " + table + " " + 
Bytes.toString(region));
-    }
-    return puts;
-  }
-
   public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
     List<Delete> lstDels = new ArrayList<>(lst.size());
     for (TableName table : lst) {
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java
new file mode 100644
index 00000000000..0f1e79c976b
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The data corresponding to a single bulk-loaded file that is being tracked 
by the backup logic.
+ */
[email protected]
+public class BulkLoad {
+  private final TableName tableName;
+  private final String region;
+  private final String columnFamily;
+  private final String hfilePath;
+  private final byte[] rowKey;
+
+  public BulkLoad(TableName tableName, String region, String columnFamily, 
String hfilePath,
+    byte[] rowKey) {
+    this.tableName = tableName;
+    this.region = region;
+    this.columnFamily = columnFamily;
+    this.hfilePath = hfilePath;
+    this.rowKey = rowKey;
+  }
+
+  public TableName getTableName() {
+    return tableName;
+  }
+
+  public String getRegion() {
+    return region;
+  }
+
+  public String getColumnFamily() {
+    return columnFamily;
+  }
+
+  public String getHfilePath() {
+    return hfilePath;
+  }
+
+  public byte[] getRowKey() {
+    return rowKey;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof BulkLoad)) {
+      return false;
+    }
+    BulkLoad that = (BulkLoad) o;
+    return new EqualsBuilder().append(tableName, 
that.tableName).append(region, that.region)
+      .append(columnFamily, that.columnFamily).append(hfilePath, 
that.hfilePath)
+      .append(rowKey, that.rowKey).isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new 
HashCodeBuilder().append(tableName).append(region).append(columnFamily)
+      .append(hfilePath).append(rowKey).toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this, ToStringStyle.NO_CLASS_NAME_STYLE)
+      .append("tableName", tableName).append("region", 
region).append("columnFamily", columnFamily)
+      .append("hfilePath", hfilePath).append("rowKey", rowKey).toString();
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 6ad487337c2..03a6ecc02f3 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,16 +48,16 @@ import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.hbase.mapreduce.WALPlayer;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.util.Tool;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
 
 /**
@@ -110,22 +110,17 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     return -1;
   }
 
-  /*
+  /**
    * Reads bulk load records from backup table, iterates through the records 
and forms the paths for
    * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination. 
This method does NOT
    * clean up the entries in the bulk load system table. Those entries should 
not be cleaned until
    * the backup is marked as complete.
-   * @param sTableList list of tables to be backed up
-   * @return the rowkeys of bulk loaded files
+   * @param tablesToBackup list of tables to be backed up
    */
-  @SuppressWarnings("unchecked")
-  protected List<byte[]> handleBulkLoad(List<TableName> sTableList) throws 
IOException {
-    Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
+  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) 
throws IOException {
     List<String> activeFiles = new ArrayList<>();
     List<String> archiveFiles = new ArrayList<>();
-    Pair<Map<TableName, Map<String, Map<String, List<Pair<String, 
Boolean>>>>>, List<byte[]>> pair =
-      backupManager.readBulkloadRows(sTableList);
-    Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map 
= pair.getFirst();
+    List<BulkLoad> bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
     FileSystem tgtFs;
     try {
       tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
@@ -135,74 +130,46 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     Path rootdir = CommonFSUtils.getRootDir(conf);
     Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
 
-    for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, 
Boolean>>>>> tblEntry : map
-      .entrySet()) {
-      TableName srcTable = tblEntry.getKey();
+    for (BulkLoad bulkLoad : bulkLoads) {
+      TableName srcTable = bulkLoad.getTableName();
+      String regionName = bulkLoad.getRegion();
+      String fam = bulkLoad.getColumnFamily();
+      String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
 
-      int srcIdx = getIndex(srcTable, sTableList);
-      if (srcIdx < 0) {
-        LOG.warn("Couldn't find " + srcTable + " in source table List");
+      if (!tablesToBackup.contains(srcTable)) {
+        LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
         continue;
       }
-      if (mapForSrc[srcIdx] == null) {
-        mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-      }
       Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
-      Path tgtTable = new Path(new Path(tgtRoot, 
srcTable.getNamespaceAsString()),
-        srcTable.getQualifierAsString());
-      for (Map.Entry<String, Map<String, List<Pair<String, Boolean>>>> 
regionEntry : tblEntry
-        .getValue().entrySet()) {
-        String regionName = regionEntry.getKey();
-        Path regionDir = new Path(tblDir, regionName);
-        // map from family to List of hfiles
-        for (Map.Entry<String, List<Pair<String, Boolean>>> famEntry : 
regionEntry.getValue()
-          .entrySet()) {
-          String fam = famEntry.getKey();
-          Path famDir = new Path(regionDir, fam);
-          List<Path> files;
-          if (!mapForSrc[srcIdx].containsKey(Bytes.toBytes(fam))) {
-            files = new ArrayList<>();
-            mapForSrc[srcIdx].put(Bytes.toBytes(fam), files);
-          } else {
-            files = mapForSrc[srcIdx].get(Bytes.toBytes(fam));
-          }
-          Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, 
srcTable, regionName, fam);
-          String tblName = srcTable.getQualifierAsString();
-          Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
-          if (!tgtFs.mkdirs(tgtFam)) {
-            throw new IOException("couldn't create " + tgtFam);
-          }
-          for (Pair<String, Boolean> fileWithState : famEntry.getValue()) {
-            String file = fileWithState.getFirst();
-            int idx = file.lastIndexOf("/");
-            String filename = file;
-            if (idx > 0) {
-              filename = file.substring(idx + 1);
-            }
-            Path p = new Path(famDir, filename);
-            Path tgt = new Path(tgtFam, filename);
-            Path archive = new Path(archiveDir, filename);
-            if (fs.exists(p)) {
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("found bulk hfile " + file + " in " + famDir + " for 
" + tblName);
-              }
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("copying " + p + " to " + tgt);
-              }
-              activeFiles.add(p.toString());
-            } else if (fs.exists(archive)) {
-              LOG.debug("copying archive " + archive + " to " + tgt);
-              archiveFiles.add(archive.toString());
-            }
-            files.add(tgt);
-          }
+      Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
+
+      String srcTableQualifier = srcTable.getQualifierAsString();
+      String srcTableNs = srcTable.getNamespaceAsString();
+      Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + 
srcTableQualifier
+        + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
+      if (!tgtFs.mkdirs(tgtFam)) {
+        throw new IOException("couldn't create " + tgtFam);
+      }
+      Path tgt = new Path(tgtFam, filename);
+
+      Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, 
regionName, fam);
+      Path archive = new Path(archiveDir, filename);
+
+      if (fs.exists(p)) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("found bulk hfile {} in {} for {}", 
bulkLoad.getHfilePath(), p.getParent(),
+            srcTableQualifier);
+          LOG.trace("copying {} to {}", p, tgt);
         }
+        activeFiles.add(p.toString());
+      } else if (fs.exists(archive)) {
+        LOG.debug("copying archive {} to {}", archive, tgt);
+        archiveFiles.add(archive.toString());
       }
     }
 
     copyBulkLoadedFiles(activeFiles, archiveFiles);
-
-    return pair.getSecond();
+    return bulkLoads;
   }
 
   private void copyBulkLoadedFiles(List<String> activeFiles, List<String> 
archiveFiles)
@@ -327,11 +294,12 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
         
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
       backupManager.writeBackupStartCode(newStartCode);
 
-      List<byte[]> bulkLoadedRows = handleBulkLoad(backupInfo.getTableNames());
+      List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames());
 
       // backup complete
       completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf);
 
+      List<byte[]> bulkLoadedRows = Lists.transform(bulkLoads, 
BulkLoad::getRowKey);
       backupManager.deleteBulkLoadedRows(bulkLoadedRows);
     } catch (IOException e) {
       failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",

Reply via email to