taklwu commented on code in PR #7400:
URL: https://github.com/apache/hbase/pull/7400#discussion_r2449776589


##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -137,89 +139,88 @@ protected static int getIndex(TableName tbl, 
List<TableName> sTableList) {
    * the backup is marked as complete.
    * @param tablesToBackup list of tables to be backed up
    */
-  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) 
throws IOException {
+  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup,
+    Map<TableName, List<String>> tablesToWALFileList, Map<TableName, Long> 
tablesToPrevBackupTs)
+    throws IOException {
     Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>();
-    List<BulkLoad> bulkLoads;
-    if (backupInfo.isContinuousBackupEnabled()) {
-      bulkLoads =
-        backupManager.readBulkloadRows(tablesToBackup, 
backupInfo.getIncrCommittedWalTs());
-    } else {
-      bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
-    }
+    List<BulkLoad> bulkLoads = new ArrayList<>();
+
     FileSystem tgtFs;
     try {
       tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
     } catch (URISyntaxException use) {
       throw new IOException("Unable to get FileSystem", use);
     }
+
     Path rootdir = CommonFSUtils.getRootDir(conf);
     Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
 
-    for (BulkLoad bulkLoad : bulkLoads) {
-      TableName srcTable = bulkLoad.getTableName();
-      MergeSplitBulkloadInfo bulkloadInfo =
-        toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
-      String regionName = bulkLoad.getRegion();
-      String fam = bulkLoad.getColumnFamily();
-      String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+    if (!backupInfo.isContinuousBackupEnabled()) {
+      bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
+      for (BulkLoad bulkLoad : bulkLoads) {
+        TableName srcTable = bulkLoad.getTableName();
+        MergeSplitBulkloadInfo bulkloadInfo =
+          toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
+        String regionName = bulkLoad.getRegion();
+        String fam = bulkLoad.getColumnFamily();
+        String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+
+        if (!tablesToBackup.contains(srcTable)) {
+          LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
+          continue;
+        }
+        Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
+        Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
+
+        String srcTableQualifier = srcTable.getQualifierAsString();
+        String srcTableNs = srcTable.getNamespaceAsString();
+        Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + 
srcTableQualifier
+          + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
+        if (!tgtFs.mkdirs(tgtFam)) {
+          throw new IOException("couldn't create " + tgtFam);
+        }
+        Path tgt = new Path(tgtFam, filename);
+
+        Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, 
regionName, fam);
+        Path archive = new Path(archiveDir, filename);
 
-      if (!tablesToBackup.contains(srcTable)) {
-        LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
-        continue;
-      }
-      Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
-      Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
-
-      // For continuous backup: bulkload files are copied from backup 
directory defined by
-      // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster.
-      String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
-      if (backupInfo.isContinuousBackupEnabled() && 
!Strings.isNullOrEmpty(backupRootDir)) {
-        String dayDirectoryName = 
BackupUtils.formatToDateString(bulkLoad.getTimestamp());
-        Path bulkLoadBackupPath =
-          new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR + 
dayDirectoryName);
-        Path bulkLoadDir = new Path(bulkLoadBackupPath,
-          srcTable.getNamespaceAsString() + Path.SEPARATOR + 
srcTable.getNameAsString());
-        FileSystem backupFs = FileSystem.get(bulkLoadDir.toUri(), conf);
-        Path fullBulkLoadBackupPath =
-          new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
-        if (backupFs.exists(fullBulkLoadBackupPath)) {
-          LOG.debug("Backup bulkload file found {}", fullBulkLoadBackupPath);
-          p = fullBulkLoadBackupPath;
-        } else {
-          LOG.warn("Backup bulkload file not found {}", 
fullBulkLoadBackupPath);
+        if (fs.exists(p)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("found bulk hfile {} in {} for {}", 
bulkLoad.getHfilePath(), p.getParent(),
+              srcTableQualifier);
+            LOG.trace("copying {} to {}", p, tgt);
+          }
+          bulkloadInfo.addActiveFile(p.toString());
+        } else if (fs.exists(archive)) {
+          LOG.debug("copying archive {} to {}", archive, tgt);
+          bulkloadInfo.addArchiveFiles(archive.toString());
         }
       }
 
-      String srcTableQualifier = srcTable.getQualifierAsString();
-      String srcTableNs = srcTable.getNamespaceAsString();
-      Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + 
srcTableQualifier
-        + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
-      if (!tgtFs.mkdirs(tgtFam)) {
-        throw new IOException("couldn't create " + tgtFam);
+      for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) {
+        mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(),
+          bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs);
       }
-      Path tgt = new Path(tgtFam, filename);
+    } else {
+      // Continuous incremental backup: run BulkLoadCollectorJob over 
backed-up WALs
+      Path collectorOutput = new Path(getBulkOutputDir(), 
BULKLOAD_COLLECTOR_OUTPUT);
+      for (TableName table : tablesToBackup) {
+        String walDirsCsv = String.join(",", tablesToWALFileList.get(table));
 
-      Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, 
regionName, fam);
-      Path archive = new Path(archiveDir, filename);
+        List<Path> bulkloadPaths =
+          BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv, 
collectorOutput, table, table,
+            tablesToPrevBackupTs.get(table), 
backupInfo.getIncrCommittedWalTs());
 
-      if (fs.exists(p)) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("found bulk hfile {} in {} for {}", 
bulkLoad.getHfilePath(), p.getParent(),
-            srcTableQualifier);
-          LOG.trace("copying {} to {}", p, tgt);
+        List<String> bulkLoadFiles =
+          
bulkloadPaths.stream().map(Path::toString).collect(Collectors.toList());
+
+        if (bulkLoadFiles.isEmpty()) {
+          LOG.info("No bulk-load files found for table {}", table);
+        } else {
+          mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs);
         }

Review Comment:
   [nit] using `continue` may align the style with the other loop of 
`!backupInfo.isContinuousBackupEnabled()`
   ```suggestion
           if (bulkLoadFiles.isEmpty()) {
             LOG.info("No bulk-load files found for table {}", table);
             continue;
           } 
           mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs);
   ```



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java:
##########
@@ -96,8 +96,11 @@ private void registerBulkLoad(ObserverContext<? extends 
RegionCoprocessorEnviron
     try (Connection connection = ConnectionFactory.createConnection(cfg);
       BackupSystemTable tbl = new BackupSystemTable(connection)) {
       Set<TableName> fullyBackedUpTables = tbl.getTablesIncludedInBackups();
+      Map<TableName, Long> continuousBackupTableSet = 
tbl.getContinuousBackupTableSet();
 
-      if (fullyBackedUpTables.contains(tableName)) {
+      if (
+        fullyBackedUpTables.contains(tableName) && 
!continuousBackupTableSet.containsKey(tableName)

Review Comment:
   [nit] do you see a lot of entries before this change that keeps registering 
for the same table? if so and if this is not only unit test, do you think it's 
a logic error from that trigger? 



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -533,11 +543,7 @@ protected void walToHFiles(List<String> dirPaths, 
List<String> tableList, long p
     conf.set(JOB_NAME_CONF_KEY, jobname);
     if (backupInfo.isContinuousBackupEnabled()) {
       conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs));
-      // committedWALsTs is needed only for Incremental backups with 
continuous backup
-      // since these do not depend on log roll ts
-      long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn);
-      backupInfo.setIncrCommittedWalTs(committedWALsTs);
-      conf.set(WALInputFormat.END_TIME_KEY, Long.toString(committedWALsTs));
+      conf.set(WALInputFormat.END_TIME_KEY, 
Long.toString(backupInfo.getIncrCommittedWalTs()));

Review Comment:
   do we have any existing unit test for this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to