This is an automated email from the ASF dual-hosted git repository.
taklwu pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28957 by this push:
new 41b217d3204 HBASE-29406: Skip Copying Bulkloaded Files to Backup
Location in Continuous Backup (#7119)
41b217d3204 is described below
commit 41b217d3204194639902929d3df4ea900dd7c643
Author: vinayak hegde <[email protected]>
AuthorDate: Fri Jun 27 23:53:52 2025 +0530
HBASE-29406: Skip Copying Bulkloaded Files to Backup Location in Continuous
Backup (#7119)
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
Reviewed by: Kevin Geiszler <[email protected]>
---
.../hadoop/hbase/backup/impl/BackupCommands.java | 14 +--
.../hbase/backup/impl/FullTableBackupClient.java | 8 --
.../replication/BackupFileSystemManager.java | 11 +-
.../backup/replication/BulkLoadProcessor.java | 96 -----------------
.../ContinuousBackupReplicationEndpoint.java | 93 +---------------
.../hbase/backup/TestBackupDeleteWithCleanup.java | 19 +---
.../hbase/backup/impl/TestBackupCommands.java | 7 +-
.../TestContinuousBackupReplicationEndpoint.java | 117 ++-------------------
8 files changed, 18 insertions(+), 347 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index 2020b84bc1c..3ae97c487ef 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -1004,7 +1004,6 @@ public final class BackupCommands {
new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER,
conf, backupWalDir);
FileSystem fs = manager.getBackupFs();
Path walDir = manager.getWalsDir();
- Path bulkloadDir = manager.getBulkLoadFilesDir();
// Delete contents under WAL directory
if (fs.exists(walDir)) {
@@ -1015,15 +1014,6 @@ public final class BackupCommands {
System.out.println("Deleted all contents under WAL directory: " +
walDir);
}
- // Delete contents under bulk load directory
- if (fs.exists(bulkloadDir)) {
- FileStatus[] bulkContents = fs.listStatus(bulkloadDir);
- for (FileStatus item : bulkContents) {
- fs.delete(item.getPath(), true); // recursive delete of each child
- }
- System.out.println("Deleted all contents under Bulk Load directory:
" + bulkloadDir);
- }
-
} catch (IOException e) {
System.out.println("WARNING: Failed to delete contents under backup
directories: "
+ backupWalDir + ". Error: " + e.getMessage());
@@ -1032,7 +1022,7 @@ public final class BackupCommands {
}
/**
- * Cleans up old WAL and bulk-loaded files based on the determined cutoff
timestamp.
+ * Cleans up old WAL files based on the determined cutoff timestamp.
*/
void deleteOldWALFiles(Configuration conf, String backupWalDir, long
cutoffTime)
throws IOException {
@@ -1043,7 +1033,6 @@ public final class BackupCommands {
new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf,
backupWalDir);
FileSystem fs = manager.getBackupFs();
Path walDir = manager.getWalsDir();
- Path bulkloadDir = manager.getBulkLoadFilesDir();
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
@@ -1069,7 +1058,6 @@ public final class BackupCommands {
if (dayStart + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
System.out.println("Deleting outdated WAL directory: " + dirPath);
fs.delete(dirPath, true);
- fs.delete(new Path(bulkloadDir, dirName), true);
}
} catch (ParseException e) {
System.out.println("WARNING: Failed to parse directory name '" +
dirName
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
index 3f6ae3deb63..5b49496d626 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -190,14 +190,6 @@ public class FullTableBackupClient extends
TableBackupClient {
// set overall backup status: complete. Here we make sure to complete the
backup.
// After this checkpoint, even if entering cancel process, will let the
backup finished
backupInfo.setState(BackupState.COMPLETE);
-
- if (!conf.getBoolean("hbase.replication.bulkload.enabled", false)) {
- System.out.println("NOTE: Bulkload replication is not enabled. "
- + "Bulk loaded files will not be backed up as part of continuous
backup. "
- + "To ensure bulk loaded files are included in the backup, please
enable bulkload replication "
- + "(hbase.replication.bulkload.enabled=true) and configure other
necessary settings "
- + "to properly enable bulkload replication.");
- }
}
private void handleNonContinuousBackup(Admin admin) throws IOException {
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
index 225d3217276..9d1d818c207 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
@@ -26,20 +26,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Initializes and organizes backup directories for continuous Write-Ahead
Logs (WALs) and
- * bulk-loaded files within the specified backup root directory.
+ * Initializes and organizes backup directories for continuous Write-Ahead
Logs (WALs) files within
+ * the specified backup root directory.
*/
@InterfaceAudience.Private
public class BackupFileSystemManager {
private static final Logger LOG =
LoggerFactory.getLogger(BackupFileSystemManager.class);
public static final String WALS_DIR = "WALs";
- public static final String BULKLOAD_FILES_DIR = "bulk-load-files";
private final String peerId;
private final FileSystem backupFs;
private final Path backupRootDir;
private final Path walsDir;
- private final Path bulkLoadFilesDir;
public BackupFileSystemManager(String peerId, Configuration conf, String
backupRootDirStr)
throws IOException {
@@ -47,7 +45,6 @@ public class BackupFileSystemManager {
this.backupRootDir = new Path(backupRootDirStr);
this.backupFs = FileSystem.get(backupRootDir.toUri(), conf);
this.walsDir = createDirectory(WALS_DIR);
- this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR);
}
private Path createDirectory(String dirName) throws IOException {
@@ -61,10 +58,6 @@ public class BackupFileSystemManager {
return walsDir;
}
- public Path getBulkLoadFilesDir() {
- return bulkLoadFilesDir;
- }
-
public FileSystem getBackupFs() {
return backupFs;
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
deleted file mode 100644
index 6e1271313bc..00000000000
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.yetus.audience.InterfaceAudience;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-
-/**
- * Processes bulk load files from Write-Ahead Log (WAL) entries for HBase
replication.
- * <p>
- * This utility class extracts and constructs the file paths of bulk-loaded
files based on WAL
- * entries. It processes bulk load descriptors and their associated store
descriptors to generate
- * the paths for each bulk-loaded file.
- * <p>
- * The class is designed for scenarios where replicable bulk load operations
need to be parsed and
- * their file paths need to be determined programmatically.
- * </p>
- */
[email protected]
-public final class BulkLoadProcessor {
- private BulkLoadProcessor() {
- }
-
- public static List<Path> processBulkLoadFiles(List<WAL.Entry> walEntries)
throws IOException {
- List<Path> bulkLoadFilePaths = new ArrayList<>();
-
- for (WAL.Entry entry : walEntries) {
- WALEdit edit = entry.getEdit();
- for (Cell cell : edit.getCells()) {
- if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
- TableName tableName = entry.getKey().getTableName();
- String namespace = tableName.getNamespaceAsString();
- String table = tableName.getQualifierAsString();
- bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace,
table));
- }
- }
- }
- return bulkLoadFilePaths;
- }
-
- private static List<Path> processBulkLoadDescriptor(Cell cell, String
namespace, String table)
- throws IOException {
- List<Path> bulkLoadFilePaths = new ArrayList<>();
- WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
-
- if (bld == null || !bld.getReplicate() || bld.getEncodedRegionName() ==
null) {
- return bulkLoadFilePaths; // Skip if not replicable
- }
-
- String regionName = bld.getEncodedRegionName().toStringUtf8();
- for (WALProtos.StoreDescriptor storeDescriptor : bld.getStoresList()) {
- bulkLoadFilePaths
- .addAll(processStoreDescriptor(storeDescriptor, namespace, table,
regionName));
- }
-
- return bulkLoadFilePaths;
- }
-
- private static List<Path> processStoreDescriptor(WALProtos.StoreDescriptor
storeDescriptor,
- String namespace, String table, String regionName) {
- List<Path> paths = new ArrayList<>();
- String columnFamily = storeDescriptor.getFamilyName().toStringUtf8();
-
- for (String storeFile : storeDescriptor.getStoreFileList()) {
- paths.add(new Path(namespace,
- new Path(table, new Path(regionName, new Path(columnFamily,
storeFile)))));
- }
-
- return paths;
- }
-}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
index eeacc8fbf34..bf3fbd531bf 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -33,10 +33,8 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WAL;
@@ -56,8 +53,8 @@ import org.slf4j.LoggerFactory;
/**
* ContinuousBackupReplicationEndpoint is responsible for replicating WAL
entries to a backup
* storage. It organizes WAL entries by day and periodically flushes the data,
ensuring that WAL
- * files do not exceed the configured size. The class includes mechanisms for
handling the WAL
- * files, performing bulk load backups, and ensuring that the replication
process is safe.
+ * files do not exceed the configured size. The class includes mechanisms for
handling the WAL files
+ * and ensuring that the replication process is safe.
*/
@InterfaceAudience.Private
public class ContinuousBackupReplicationEndpoint extends
BaseReplicationEndpoint {
@@ -292,20 +289,11 @@ public class ContinuousBackupReplicationEndpoint extends
BaseReplicationEndpoint
try {
FSHLogProvider.Writer walWriter = walWriters.computeIfAbsent(day,
this::createWalWriter);
- List<Path> bulkLoadFiles =
BulkLoadProcessor.processBulkLoadFiles(walEntries);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} Processed {} bulk load files for WAL entries",
Utils.logPeerId(peerId),
- bulkLoadFiles.size());
- LOG.trace("{} Bulk load files: {}", Utils.logPeerId(peerId),
-
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
- }
for (WAL.Entry entry : walEntries) {
walWriter.append(entry);
}
walWriter.sync(true);
- uploadBulkLoadFiles(day, bulkLoadFiles);
} catch (UncheckedIOException e) {
String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create
WAL Writer for " + day;
LOG.error("{} Backup failed for day {}. Error: {}",
Utils.logPeerId(peerId), day,
@@ -375,41 +363,6 @@ public class ContinuousBackupReplicationEndpoint extends
BaseReplicationEndpoint
}
}
- private void uploadBulkLoadFiles(long dayInMillis, List<Path> bulkLoadFiles)
throws IOException {
- LOG.debug("{} Starting upload of {} bulk load files",
Utils.logPeerId(peerId),
- bulkLoadFiles.size());
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId),
-
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
- }
- String dayDirectoryName = formatToDateString(dayInMillis);
- Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(),
dayDirectoryName);
- backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir);
-
- for (Path file : bulkLoadFiles) {
- Path sourcePath = getBulkLoadFileStagingPath(file);
- Path destPath = new Path(bulkloadDir, file);
-
- try {
- LOG.debug("{} Copying bulk load file from {} to {}",
Utils.logPeerId(peerId), sourcePath,
- destPath);
-
- FileUtil.copy(CommonFSUtils.getRootDirFileSystem(conf), sourcePath,
- backupFileSystemManager.getBackupFs(), destPath, false, conf);
-
- LOG.info("{} Bulk load file {} successfully backed up to {}",
Utils.logPeerId(peerId), file,
- destPath);
- } catch (IOException e) {
- LOG.error("{} Failed to back up bulk load file {}: {}",
Utils.logPeerId(peerId), file,
- e.getMessage(), e);
- throw e;
- }
- }
-
- LOG.debug("{} Completed upload of bulk load files",
Utils.logPeerId(peerId));
- }
-
/**
* Convert dayInMillis to "yyyy-MM-dd" format
*/
@@ -419,48 +372,6 @@ public class ContinuousBackupReplicationEndpoint extends
BaseReplicationEndpoint
return dateFormat.format(new Date(dayInMillis));
}
- private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace)
throws IOException {
- FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
- Path rootDir = CommonFSUtils.getRootDir(conf);
- Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
- Path baseNamespaceDir = new Path(rootDir, baseNSDir);
- Path hFileArchiveDir =
- new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY,
baseNSDir));
-
- LOG.debug("{} Searching for bulk load file: {} in paths: {}, {}",
Utils.logPeerId(peerId),
- relativePathFromNamespace, baseNamespaceDir, hFileArchiveDir);
-
- Path result =
- findExistingPath(rootFs, baseNamespaceDir, hFileArchiveDir,
relativePathFromNamespace);
-
- if (result == null) {
- LOG.error("{} No bulk loaded file found in relative path: {}",
Utils.logPeerId(peerId),
- relativePathFromNamespace);
- throw new IOException(
- "No Bulk loaded file found in relative path: " +
relativePathFromNamespace);
- }
-
- LOG.debug("{} Bulk load file found at {}", Utils.logPeerId(peerId),
result);
- return result;
- }
-
- private static Path findExistingPath(FileSystem rootFs, Path
baseNamespaceDir,
- Path hFileArchiveDir, Path filePath) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Checking for bulk load file at: {} and {}", new
Path(baseNamespaceDir, filePath),
- new Path(hFileArchiveDir, filePath));
- }
-
- for (Path candidate : new Path[] { new Path(baseNamespaceDir, filePath),
- new Path(hFileArchiveDir, filePath) }) {
- if (rootFs.exists(candidate)) {
- LOG.debug("Found bulk load file at: {}", candidate);
- return candidate;
- }
- }
- return null;
- }
-
private void shutdownFlushExecutor() {
if (flushExecutor != null) {
LOG.info("{} Initiating WAL flush executor shutdown.",
Utils.logPeerId(peerId));
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
index 07c9110072b..d22f4c9cda9 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.backup;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
-import static
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
import static
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
@@ -165,7 +164,7 @@ public class TestBackupDeleteWithCleanup extends
TestBackupBase {
// Step 6: Verify that the backup WAL directory is empty
assertTrue("WAL backup directory should be empty after force delete",
- areWalAndBulkloadDirsEmpty(conf1, backupWalDir.toString()));
+ isWalDirsEmpty(conf1, backupWalDir.toString()));
// Step 7: Take new full backup with continuous backup enabled
String backupIdContinuous =
fullTableBackupWithContinuous(Lists.newArrayList(table1));
@@ -190,35 +189,28 @@ public class TestBackupDeleteWithCleanup extends
TestBackupBase {
public static void setupBackupFolders(FileSystem fs, Path backupWalDir, long
currentTime)
throws IOException {
Path walsDir = new Path(backupWalDir, WALS_DIR);
- Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
fs.mkdirs(walsDir);
- fs.mkdirs(bulkLoadDir);
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
for (int i = 0; i < 5; i++) {
String dateStr = dateFormat.format(new Date(currentTime - (i *
ONE_DAY_IN_MILLISECONDS)));
fs.mkdirs(new Path(walsDir, dateStr));
- fs.mkdirs(new Path(bulkLoadDir, dateStr));
}
}
private static void verifyBackupCleanup(FileSystem fs, Path backupWalDir,
long currentTime)
throws IOException {
Path walsDir = new Path(backupWalDir, WALS_DIR);
- Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
// Expect folders older than 3 days to be deleted
for (int i = 3; i < 5; i++) {
String oldDateStr = dateFormat.format(new Date(currentTime - (i *
ONE_DAY_IN_MILLISECONDS)));
Path walPath = new Path(walsDir, oldDateStr);
- Path bulkLoadPath = new Path(bulkLoadDir, oldDateStr);
assertFalse("Old WAL directory (" + walPath + ") should be deleted, but
it exists!",
fs.exists(walPath));
- assertFalse("Old BulkLoad directory (" + bulkLoadPath + ") should be
deleted, but it exists!",
- fs.exists(bulkLoadPath));
}
// Expect folders within the last 3 days to exist
@@ -226,13 +218,9 @@ public class TestBackupDeleteWithCleanup extends
TestBackupBase {
String recentDateStr =
dateFormat.format(new Date(currentTime - (i *
ONE_DAY_IN_MILLISECONDS)));
Path walPath = new Path(walsDir, recentDateStr);
- Path bulkLoadPath = new Path(bulkLoadDir, recentDateStr);
assertTrue("Recent WAL directory (" + walPath + ") should exist, but it
is missing!",
fs.exists(walPath));
- assertTrue(
- "Recent BulkLoad directory (" + bulkLoadPath + ") should exist, but it
is missing!",
- fs.exists(bulkLoadPath));
}
}
@@ -276,16 +264,15 @@ public class TestBackupDeleteWithCleanup extends
TestBackupBase {
peer -> peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER) &&
peer.isEnabled());
}
- private static boolean areWalAndBulkloadDirsEmpty(Configuration conf, String
backupWalDir)
+ private static boolean isWalDirsEmpty(Configuration conf, String
backupWalDir)
throws IOException {
BackupFileSystemManager manager =
new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf,
backupWalDir);
FileSystem fs = manager.getBackupFs();
Path walDir = manager.getWalsDir();
- Path bulkloadDir = manager.getBulkLoadFilesDir();
- return isDirectoryEmpty(fs, walDir) && isDirectoryEmpty(fs, bulkloadDir);
+ return isDirectoryEmpty(fs, walDir);
}
private static boolean isDirectoryEmpty(FileSystem fs, Path dirPath) throws
IOException {
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
index b2ebbd640bb..e00ebd6099f 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.backup.impl;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static
org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.logDirectoryStructure;
import static
org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.setupBackupFolders;
-import static
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
import static
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
@@ -135,7 +134,7 @@ public class TestBackupCommands extends TestBackupBase {
fs.mkdirs(backupWalDir);
long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
- setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of
WAL/bulk folders
+ setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of
WALs folders
logDirectoryStructure(fs, backupWalDir, "Before cleanup:");
@@ -155,7 +154,6 @@ public class TestBackupCommands extends TestBackupBase {
private static void verifyCleanupOutcome(FileSystem fs, Path backupWalDir,
long currentTime,
long cutoffTime) throws IOException {
Path walsDir = new Path(backupWalDir, WALS_DIR);
- Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
@@ -163,14 +161,11 @@ public class TestBackupCommands extends TestBackupBase {
long dayTime = currentTime - (i * ONE_DAY_IN_MILLISECONDS);
String dayDir = dateFormat.format(new Date(dayTime));
Path walPath = new Path(walsDir, dayDir);
- Path bulkPath = new Path(bulkLoadDir, dayDir);
if (dayTime + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
assertFalse("Old WAL dir should be deleted: " + walPath,
fs.exists(walPath));
- assertFalse("Old BulkLoad dir should be deleted: " + bulkPath,
fs.exists(bulkPath));
} else {
assertTrue("Recent WAL dir should exist: " + walPath,
fs.exists(walPath));
- assertTrue("Recent BulkLoad dir should exist: " + bulkPath,
fs.exists(bulkPath));
}
}
}
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
index 253675f85d9..3919746d3b7 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
@@ -17,9 +17,7 @@
*/
package org.apache.hadoop.hbase.backup.replication;
-import static
org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
-import static
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
import static
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR;
@@ -66,11 +64,8 @@ import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
-import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
-import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
@@ -96,14 +91,12 @@ public class TestContinuousBackupReplicationEndpoint {
private final String replicationEndpoint =
ContinuousBackupReplicationEndpoint.class.getName();
private static final String CF_NAME = "cf";
- private static final byte[] QUALIFIER = Bytes.toBytes("my-qualifier");
static FileSystem fs = null;
static Path root;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// Set the configuration properties as required
- conf.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true);
conf.set(REPLICATION_CLUSTER_ID, "clusterId1");
TEST_UTIL.startMiniZKCluster();
@@ -122,7 +115,7 @@ public class TestContinuousBackupReplicationEndpoint {
}
@Test
- public void testWALAndBulkLoadFileBackup() throws IOException {
+ public void testWALBackup() throws IOException {
String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
TableName tableName = TableName.valueOf("table_" + methodName);
String peerId = "peerId";
@@ -140,15 +133,10 @@ public class TestContinuousBackupReplicationEndpoint {
loadRandomData(tableName, 100);
assertEquals(100, getRowCount(tableName));
- Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily");
- generateHFiles(dir);
- bulkLoadHFiles(tableName, dir);
- assertEquals(1100, getRowCount(tableName));
-
waitForReplication(15000);
deleteReplicationPeer(peerId);
- verifyBackup(backupRootDir.toString(), true, Map.of(tableName, 1100));
+ verifyBackup(backupRootDir.toString(), Map.of(tableName, 100));
deleteTable(tableName);
}
@@ -196,7 +184,7 @@ public class TestContinuousBackupReplicationEndpoint {
waitForReplication(15000);
deleteReplicationPeer(peerId);
- verifyBackup(backupRootDir.toString(), false, Map.of(table1, 100, table2,
100, table3, 50));
+ verifyBackup(backupRootDir.toString(), Map.of(table1, 100, table2, 100,
table3, 50));
for (TableName table : List.of(table1, table2, table3)) {
deleteTable(table);
@@ -254,7 +242,7 @@ public class TestContinuousBackupReplicationEndpoint {
waitForReplication(20000);
deleteReplicationPeer(peerId);
- verifyBackup(backupRootDir.toString(), false, Map.of(tableName,
getRowCount(tableName)));
+ verifyBackup(backupRootDir.toString(), Map.of(tableName,
getRowCount(tableName)));
deleteTable(tableName);
}
@@ -301,7 +289,7 @@ public class TestContinuousBackupReplicationEndpoint {
waitForReplication(15000);
deleteReplicationPeer(peerId);
- verifyBackup(backupRootDir.toString(), false, Map.of(tableName, 200));
+ verifyBackup(backupRootDir.toString(), Map.of(tableName, 200));
// Verify that WALs are stored in two directories, one for each day
Path walDir = new Path(backupRootDir, WALS_DIR);
@@ -370,42 +358,6 @@ public class TestContinuousBackupReplicationEndpoint {
}
}
- private void bulkLoadHFiles(TableName tableName, Path inputDir) throws
IOException {
-
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
true);
-
- try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
- BulkLoadHFiles loader = new
BulkLoadHFilesTool(TEST_UTIL.getConfiguration());
- loader.bulkLoad(table.getName(), inputDir);
- } finally {
-
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
false);
- }
- }
-
- private void bulkLoadHFiles(TableName tableName, Map<byte[], List<Path>>
family2Files)
- throws IOException {
-
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
true);
-
- try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
- BulkLoadHFiles loader = new
BulkLoadHFilesTool(TEST_UTIL.getConfiguration());
- loader.bulkLoad(table.getName(), family2Files);
- } finally {
-
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
false);
- }
- }
-
- private void generateHFiles(Path outputDir) throws IOException {
- String hFileName = "MyHFile";
- int numRows = 1000;
- outputDir = outputDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
-
- byte[] from = Bytes.toBytes(CF_NAME + "begin");
- byte[] to = Bytes.toBytes(CF_NAME + "end");
-
- Path familyDir = new Path(outputDir, CF_NAME);
- HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, new
Path(familyDir, hFileName),
- Bytes.toBytes(CF_NAME), QUALIFIER, from, to, numRows);
- }
-
private void waitForReplication(int durationInMillis) {
LOG.info("Waiting for replication to complete for {} ms",
durationInMillis);
try {
@@ -418,17 +370,12 @@ public class TestContinuousBackupReplicationEndpoint {
/**
* Verifies the backup process by: 1. Checking whether any WAL (Write-Ahead
Log) files were
- * generated in the backup directory. 2. Checking whether any bulk-loaded
files were generated in
- * the backup directory. 3. Replaying the WAL and bulk-loaded files (if
present) to restore data
- * and check consistency by verifying that the restored data matches the
expected row count for
- * each table.
+ * generated in the backup directory. 2. Replaying the WAL files to restore
data and check
+ * consistency by verifying that the restored data matches the expected row
count for each table.
*/
- private void verifyBackup(String backupRootDir, boolean hasBulkLoadFiles,
- Map<TableName, Integer> tablesWithExpectedRows) throws IOException {
+ private void verifyBackup(String backupRootDir, Map<TableName, Integer>
tablesWithExpectedRows)
+ throws IOException {
verifyWALBackup(backupRootDir);
- if (hasBulkLoadFiles) {
- verifyBulkLoadBackup(backupRootDir);
- }
for (Map.Entry<TableName, Integer> entry :
tablesWithExpectedRows.entrySet()) {
TableName tableName = entry.getKey();
@@ -440,21 +387,6 @@ public class TestContinuousBackupReplicationEndpoint {
replayWALs(new Path(backupRootDir, WALS_DIR).toString(), tableName);
- // replay Bulk loaded HFiles if Present
- try {
- Path bulkloadDir = new Path(backupRootDir, BULKLOAD_FILES_DIR);
- if (fs.exists(bulkloadDir)) {
- FileStatus[] directories = fs.listStatus(bulkloadDir);
- for (FileStatus dirStatus : directories) {
- if (dirStatus.isDirectory()) {
- replayBulkLoadHFilesIfPresent(dirStatus.getPath().toString(),
tableName);
- }
- }
- }
- } catch (Exception e) {
- fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage());
- }
-
assertEquals(expectedRows, getRowCount(tableName));
}
}
@@ -480,15 +412,6 @@ public class TestContinuousBackupReplicationEndpoint {
assertFalse("Expected some WAL files but found none!", walFiles.isEmpty());
}
- private void verifyBulkLoadBackup(String backupRootDir) throws IOException {
- Path bulkLoadFilesDir = new Path(backupRootDir, BULKLOAD_FILES_DIR);
- assertTrue("BulkLoad Files directory does not exist!",
fs.exists(bulkLoadFilesDir));
-
- FileStatus[] bulkLoadFiles = fs.listStatus(bulkLoadFilesDir);
- assertNotNull("No Bulk load files found!", bulkLoadFiles);
- assertTrue("Expected some Bulk load files but found none!",
bulkLoadFiles.length > 0);
- }
-
private void replayWALs(String walDir, TableName tableName) {
WALPlayer player = new WALPlayer();
try {
@@ -499,28 +422,6 @@ public class TestContinuousBackupReplicationEndpoint {
}
}
- private void replayBulkLoadHFilesIfPresent(String bulkLoadDir, TableName
tableName) {
- try {
- Path tableBulkLoadDir = new Path(bulkLoadDir + "/default/" + tableName);
- if (fs.exists(tableBulkLoadDir)) {
- RemoteIterator<LocatedFileStatus> fileStatusIterator =
fs.listFiles(tableBulkLoadDir, true);
- List<Path> bulkLoadFiles = new ArrayList<>();
-
- while (fileStatusIterator.hasNext()) {
- LocatedFileStatus fileStatus = fileStatusIterator.next();
- Path filePath = fileStatus.getPath();
-
- if (!fileStatus.isDirectory()) {
- bulkLoadFiles.add(filePath);
- }
- }
- bulkLoadHFiles(tableName, Map.of(Bytes.toBytes(CF_NAME),
bulkLoadFiles));
- }
- } catch (Exception e) {
- fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage());
- }
- }
-
private int getRowCount(TableName tableName) throws IOException {
try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
return HBaseTestingUtil.countRows(table);