anmolnar commented on code in PR #7400:
URL: https://github.com/apache/hbase/pull/7400#discussion_r2452637490
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -137,89 +139,88 @@ protected static int getIndex(TableName tbl,
List<TableName> sTableList) {
* the backup is marked as complete.
* @param tablesToBackup list of tables to be backed up
*/
- protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup)
throws IOException {
+ protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup,
+ Map<TableName, List<String>> tablesToWALFileList, Map<TableName, Long>
tablesToPrevBackupTs)
+ throws IOException {
Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>();
- List<BulkLoad> bulkLoads;
- if (backupInfo.isContinuousBackupEnabled()) {
- bulkLoads =
- backupManager.readBulkloadRows(tablesToBackup,
backupInfo.getIncrCommittedWalTs());
- } else {
- bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
- }
+ List<BulkLoad> bulkLoads = new ArrayList<>();
+
FileSystem tgtFs;
try {
tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
} catch (URISyntaxException use) {
throw new IOException("Unable to get FileSystem", use);
}
+
Path rootdir = CommonFSUtils.getRootDir(conf);
Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
- for (BulkLoad bulkLoad : bulkLoads) {
- TableName srcTable = bulkLoad.getTableName();
- MergeSplitBulkloadInfo bulkloadInfo =
- toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
- String regionName = bulkLoad.getRegion();
- String fam = bulkLoad.getColumnFamily();
- String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+ if (!backupInfo.isContinuousBackupEnabled()) {
+ bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
+ for (BulkLoad bulkLoad : bulkLoads) {
+ TableName srcTable = bulkLoad.getTableName();
+ MergeSplitBulkloadInfo bulkloadInfo =
+ toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
+ String regionName = bulkLoad.getRegion();
+ String fam = bulkLoad.getColumnFamily();
+ String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+
+ if (!tablesToBackup.contains(srcTable)) {
+ LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
+ continue;
+ }
+ Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
+ Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam +
Path.SEPARATOR + filename);
+
+ String srcTableQualifier = srcTable.getQualifierAsString();
+ String srcTableNs = srcTable.getNamespaceAsString();
+ Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR +
srcTableQualifier
+ + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
+ if (!tgtFs.mkdirs(tgtFam)) {
+ throw new IOException("couldn't create " + tgtFam);
+ }
+ Path tgt = new Path(tgtFam, filename);
+
+ Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable,
regionName, fam);
+ Path archive = new Path(archiveDir, filename);
- if (!tablesToBackup.contains(srcTable)) {
- LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
- continue;
- }
- Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
- Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam +
Path.SEPARATOR + filename);
-
- // For continuous backup: bulkload files are copied from backup
directory defined by
- // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster.
- String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
- if (backupInfo.isContinuousBackupEnabled() &&
!Strings.isNullOrEmpty(backupRootDir)) {
- String dayDirectoryName =
BackupUtils.formatToDateString(bulkLoad.getTimestamp());
- Path bulkLoadBackupPath =
- new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR +
dayDirectoryName);
- Path bulkLoadDir = new Path(bulkLoadBackupPath,
- srcTable.getNamespaceAsString() + Path.SEPARATOR +
srcTable.getNameAsString());
- FileSystem backupFs = FileSystem.get(bulkLoadDir.toUri(), conf);
- Path fullBulkLoadBackupPath =
- new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam +
Path.SEPARATOR + filename);
- if (backupFs.exists(fullBulkLoadBackupPath)) {
- LOG.debug("Backup bulkload file found {}", fullBulkLoadBackupPath);
- p = fullBulkLoadBackupPath;
- } else {
- LOG.warn("Backup bulkload file not found {}",
fullBulkLoadBackupPath);
+ if (fs.exists(p)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("found bulk hfile {} in {} for {}",
bulkLoad.getHfilePath(), p.getParent(),
+ srcTableQualifier);
+ LOG.trace("copying {} to {}", p, tgt);
+ }
+ bulkloadInfo.addActiveFile(p.toString());
+ } else if (fs.exists(archive)) {
+ LOG.debug("copying archive {} to {}", archive, tgt);
+ bulkloadInfo.addArchiveFiles(archive.toString());
}
}
- String srcTableQualifier = srcTable.getQualifierAsString();
- String srcTableNs = srcTable.getNamespaceAsString();
- Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR +
srcTableQualifier
- + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
- if (!tgtFs.mkdirs(tgtFam)) {
- throw new IOException("couldn't create " + tgtFam);
+ for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) {
+ mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(),
+ bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs);
}
- Path tgt = new Path(tgtFam, filename);
+ } else {
+ // Continuous incremental backup: run BulkLoadCollectorJob over
backed-up WALs
+ Path collectorOutput = new Path(getBulkOutputDir(),
BULKLOAD_COLLECTOR_OUTPUT);
+ for (TableName table : tablesToBackup) {
+ String walDirsCsv = String.join(",", tablesToWALFileList.get(table));
- Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable,
regionName, fam);
- Path archive = new Path(archiveDir, filename);
+ List<Path> bulkloadPaths =
+ BulkFilesCollector.collectFromWalDirs(conf, walDirsCsv,
collectorOutput, table, table,
Review Comment:
I agree with @ankitsol . This class should not make a call to an abstract
class - you would have to make the method public -, instead move more logic to
the utility class if you want to share more.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]