vinayakphegde commented on code in PR #7400:
URL: https://github.com/apache/hbase/pull/7400#discussion_r2452706112


##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -137,89 +139,88 @@ protected static int getIndex(TableName tbl, 
List<TableName> sTableList) {
    * the backup is marked as complete.
    * @param tablesToBackup list of tables to be backed up
    */
-  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) 
throws IOException {
+  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup,
+    Map<TableName, List<String>> tablesToWALFileList, Map<TableName, Long> 
tablesToPrevBackupTs)
+    throws IOException {
     Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>();
-    List<BulkLoad> bulkLoads;
-    if (backupInfo.isContinuousBackupEnabled()) {
-      bulkLoads =
-        backupManager.readBulkloadRows(tablesToBackup, 
backupInfo.getIncrCommittedWalTs());
-    } else {
-      bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
-    }
+    List<BulkLoad> bulkLoads = new ArrayList<>();
+
     FileSystem tgtFs;
     try {
       tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
     } catch (URISyntaxException use) {
       throw new IOException("Unable to get FileSystem", use);
     }
+
     Path rootdir = CommonFSUtils.getRootDir(conf);
     Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
 
-    for (BulkLoad bulkLoad : bulkLoads) {
-      TableName srcTable = bulkLoad.getTableName();
-      MergeSplitBulkloadInfo bulkloadInfo =
-        toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
-      String regionName = bulkLoad.getRegion();
-      String fam = bulkLoad.getColumnFamily();
-      String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+    if (!backupInfo.isContinuousBackupEnabled()) {
+      bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
+      for (BulkLoad bulkLoad : bulkLoads) {
+        TableName srcTable = bulkLoad.getTableName();
+        MergeSplitBulkloadInfo bulkloadInfo =
+          toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
+        String regionName = bulkLoad.getRegion();
+        String fam = bulkLoad.getColumnFamily();
+        String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+
+        if (!tablesToBackup.contains(srcTable)) {
+          LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
+          continue;
+        }
+        Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
+        Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
+
+        String srcTableQualifier = srcTable.getQualifierAsString();
+        String srcTableNs = srcTable.getNamespaceAsString();
+        Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + 
srcTableQualifier
+          + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
+        if (!tgtFs.mkdirs(tgtFam)) {
+          throw new IOException("couldn't create " + tgtFam);
+        }
+        Path tgt = new Path(tgtFam, filename);
+
+        Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, 
regionName, fam);
+        Path archive = new Path(archiveDir, filename);
 
-      if (!tablesToBackup.contains(srcTable)) {
-        LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
-        continue;
-      }
-      Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
-      Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
-
-      // For continuous backup: bulkload files are copied from backup 
directory defined by
-      // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster.
-      String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
-      if (backupInfo.isContinuousBackupEnabled() && 
!Strings.isNullOrEmpty(backupRootDir)) {
-        String dayDirectoryName = 
BackupUtils.formatToDateString(bulkLoad.getTimestamp());
-        Path bulkLoadBackupPath =
-          new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR + 
dayDirectoryName);
-        Path bulkLoadDir = new Path(bulkLoadBackupPath,
-          srcTable.getNamespaceAsString() + Path.SEPARATOR + 
srcTable.getNameAsString());
-        FileSystem backupFs = FileSystem.get(bulkLoadDir.toUri(), conf);
-        Path fullBulkLoadBackupPath =
-          new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
-        if (backupFs.exists(fullBulkLoadBackupPath)) {
-          LOG.debug("Backup bulkload file found {}", fullBulkLoadBackupPath);
-          p = fullBulkLoadBackupPath;
-        } else {
-          LOG.warn("Backup bulkload file not found {}", 
fullBulkLoadBackupPath);
+        if (fs.exists(p)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("found bulk hfile {} in {} for {}", 
bulkLoad.getHfilePath(), p.getParent(),
+              srcTableQualifier);
+            LOG.trace("copying {} to {}", p, tgt);
+          }
+          bulkloadInfo.addActiveFile(p.toString());
+        } else if (fs.exists(archive)) {
+          LOG.debug("copying archive {} to {}", archive, tgt);
+          bulkloadInfo.addArchiveFiles(archive.toString());
         }
       }
 
-      String srcTableQualifier = srcTable.getQualifierAsString();
-      String srcTableNs = srcTable.getNamespaceAsString();
-      Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + 
srcTableQualifier
-        + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
-      if (!tgtFs.mkdirs(tgtFam)) {
-        throw new IOException("couldn't create " + tgtFam);
+      for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) {
+        mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(),
+          bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs);
       }
-      Path tgt = new Path(tgtFam, filename);
+    } else {
+      // Continuous incremental backup: run BulkLoadCollectorJob over 
backed-up WALs
+      Path collectorOutput = new Path(getBulkOutputDir(), 
BULKLOAD_COLLECTOR_OUTPUT);
+      for (TableName table : tablesToBackup) {
+        String walDirsCsv = String.join(",", tablesToWALFileList.get(table));
 
-      Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, 
regionName, fam);
-      Path archive = new Path(archiveDir, filename);
+        List<Path> bulkloadPaths =
+          BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv, 
collectorOutput, table, table,

Review Comment:
   > `BulkFilesCollector#collectFromWalDirs()` is itself a utility function. I 
have computed valid WAL directory using `BackupUtils#getValidWalDirs()` once 
already in `IncrementalTableBackupClient#convertWALsToHFiles()` so here I am 
reusing that. If I call `AbstractPitrRestoreHandler#collectBulkFiles()` it 
would again call `BackupUtils#getValidWalDirs()`
   
   Consider passing that as a parameter. Adjust the original methods as 
minimally as possible to accommodate both scenarios.
   
   > This class should not make a call to an abstract class
   
   No, as mentioned earlier, we should move the shared elements to a utility 
class.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to