This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 334accf4ff6 HBASE-28767 Simplify backup bulk-loading code (#6134)
334accf4ff6 is described below
commit 334accf4ff6472d6f3058ab89da499edab5fde73
Author: DieterDP <[email protected]>
AuthorDate: Fri Dec 13 14:43:13 2024 +0100
HBASE-28767 Simplify backup bulk-loading code (#6134)
Signed-off-by: Nick Dimiduk <[email protected]>
---
.../hadoop/hbase/backup/BackupHFileCleaner.java | 3 +-
.../apache/hadoop/hbase/backup/BackupObserver.java | 48 +++----
.../hadoop/hbase/backup/impl/BackupManager.java | 4 +-
.../hbase/backup/impl/BackupSystemTable.java | 153 +++++----------------
.../apache/hadoop/hbase/backup/impl/BulkLoad.java | 93 +++++++++++++
.../backup/impl/IncrementalTableBackupClient.java | 112 ++++++---------
6 files changed, 197 insertions(+), 216 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
index 619cecaeaaa..d7474997412 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.backup;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -103,7 +104,7 @@ public class BackupHFileCleaner extends
BaseHFileCleanerDelegate implements Abor
}
try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
- fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
+ fullyBackedUpTables = new
ArrayList<>(tbl.getTablesIncludedInBackups());
} catch (IOException ioe) {
LOG.error("Failed to get tables which have been fully backed up,
skipping checking", ioe);
return Collections.emptyList();
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
index 73f97365adb..db8c29c4c0a 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
@@ -18,9 +18,12 @@
package org.apache.hadoop.hbase.backup;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -64,21 +67,8 @@ public class BackupObserver implements RegionCoprocessor,
RegionObserver {
LOG.debug("skipping recording bulk load in postBulkLoadHFile since
backup is disabled");
return;
}
- try (Connection connection = ConnectionFactory.createConnection(cfg);
- BackupSystemTable tbl = new BackupSystemTable(connection)) {
- List<TableName> fullyBackedUpTables =
tbl.getTablesForBackupType(BackupType.FULL);
- RegionInfo info = ctx.getEnvironment().getRegionInfo();
- TableName tableName = info.getTable();
- if (!fullyBackedUpTables.contains(tableName)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(tableName + " has not gone thru full backup");
- }
- return;
- }
- tbl.writePathsPostBulkLoad(tableName, info.getEncodedNameAsBytes(),
finalPaths);
- } catch (IOException ioe) {
- LOG.error("Failed to get tables which have been fully backed up", ioe);
- }
+
+ registerBulkLoad(ctx, finalPaths);
}
@Override
@@ -89,19 +79,31 @@ public class BackupObserver implements RegionCoprocessor,
RegionObserver {
LOG.debug("skipping recording bulk load in preCommitStoreFile since
backup is disabled");
return;
}
+
+ List<Path> hfiles = new ArrayList<>(pairs.size());
+ for (Pair<Path, Path> pair : pairs) {
+ hfiles.add(pair.getSecond());
+ }
+ registerBulkLoad(ctx, Collections.singletonMap(family, hfiles));
+ }
+
+ private void registerBulkLoad(ObserverContext<? extends
RegionCoprocessorEnvironment> ctx,
+ Map<byte[], List<Path>> cfToHFilePaths) throws IOException {
+ Configuration cfg = ctx.getEnvironment().getConfiguration();
+ RegionInfo region = ctx.getEnvironment().getRegionInfo();
+ TableName tableName = region.getTable();
+
try (Connection connection = ConnectionFactory.createConnection(cfg);
BackupSystemTable tbl = new BackupSystemTable(connection)) {
- List<TableName> fullyBackedUpTables =
tbl.getTablesForBackupType(BackupType.FULL);
- RegionInfo info = ctx.getEnvironment().getRegionInfo();
- TableName tableName = info.getTable();
- if (!fullyBackedUpTables.contains(tableName)) {
+ Set<TableName> fullyBackedUpTables = tbl.getTablesIncludedInBackups();
+
+ if (fullyBackedUpTables.contains(tableName)) {
+ tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(),
cfToHFilePaths);
+ } else {
if (LOG.isTraceEnabled()) {
- LOG.trace(tableName + " has not gone thru full backup");
+ LOG.trace("Table {} has not gone through full backup - skipping.",
tableName);
}
- return;
}
- tbl.writeFilesForBulkLoadPreCommit(tableName,
info.getEncodedNameAsBytes(), family, pairs);
- return;
}
}
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index e3644c7d7f0..5afd580a649 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -355,8 +354,7 @@ public class BackupManager implements Closeable {
return
systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
}
- public Pair<Map<TableName, Map<String, Map<String, List<Pair<String,
Boolean>>>>>, List<byte[]>>
- readBulkloadRows(List<TableName> tableList) throws IOException {
+ public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws
IOException {
return systemTable.readBulkloadRows(tableList);
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 01106b4cd0e..203f3f61b0f 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -179,10 +178,6 @@ public final class BackupSystemTable implements Closeable {
final static byte[] TBL_COL = Bytes.toBytes("tbl");
final static byte[] FAM_COL = Bytes.toBytes("fam");
final static byte[] PATH_COL = Bytes.toBytes("path");
- final static byte[] STATE_COL = Bytes.toBytes("state");
- // the two states a bulk loaded file can be
- final static byte[] BL_PREPARE = Bytes.toBytes("R");
- final static byte[] BL_COMMIT = Bytes.toBytes("D");
private final static String SET_KEY_PREFIX = "backupset:";
@@ -378,7 +373,7 @@ public final class BackupSystemTable implements Closeable {
}
files.add(new Path(path));
if (LOG.isDebugEnabled()) {
- LOG.debug("found bulk loaded file : " + tbl + " " +
Bytes.toString(fam) + " " + path);
+ LOG.debug("found bulk loaded file : {} {} {}", tbl,
Bytes.toString(fam), path);
}
}
@@ -401,43 +396,22 @@ public final class BackupSystemTable implements Closeable
{
}
}
- /*
- * For postBulkLoadHFile() hook.
- * @param tabName table name
- * @param region the region receiving hfile
- * @param finalPaths family and associated hfiles
+ /**
+ * Registers a bulk load.
+ * @param tableName table name
+ * @param region the region receiving hfile
+ * @param cfToHfilePath column family and associated hfiles
*/
- public void writePathsPostBulkLoad(TableName tabName, byte[] region,
- Map<byte[], List<Path>> finalPaths) throws IOException {
+ public void registerBulkLoad(TableName tableName, byte[] region,
+ Map<byte[], List<Path>> cfToHfilePath) throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
finalPaths.size()
- + " entries");
+ LOG.debug("Writing bulk load descriptor to backup {} with {} entries",
tableName,
+ cfToHfilePath.size());
}
try (BufferedMutator bufferedMutator =
connection.getBufferedMutator(bulkLoadTableName)) {
- List<Put> puts =
BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
+ List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName,
region, cfToHfilePath);
bufferedMutator.mutate(puts);
- LOG.debug("written " + puts.size() + " rows for bulk load of " +
tabName);
- }
- }
-
- /*
- * For preCommitStoreFile() hook
- * @param tabName table name
- * @param region the region receiving hfile
- * @param family column family
- * @param pairs list of paths for hfiles
- */
- public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region,
final byte[] family,
- final List<Pair<Path, Path>> pairs) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "write bulk load descriptor to backup " + tabName + " with " +
pairs.size() + " entries");
- }
- try (Table table = connection.getTable(bulkLoadTableName)) {
- List<Put> puts =
- BackupSystemTable.createPutForPreparedBulkload(tabName, region,
family, pairs);
- table.put(puts);
- LOG.debug("written " + puts.size() + " rows for bulk load of " +
tabName);
+ LOG.debug("Written {} rows for bulk load of {}", puts.size(), tableName);
}
}
@@ -459,33 +433,25 @@ public final class BackupSystemTable implements Closeable
{
}
}
- /*
+ /**
* Reads the rows from backup table recording bulk loaded hfiles
* @param tableList list of table names
- * @return The keys of the Map are table, region and column family. Value of
the map reflects
- * whether the hfile was recorded by preCommitStoreFile hook (true)
- */
- public Pair<Map<TableName, Map<String, Map<String, List<Pair<String,
Boolean>>>>>, List<byte[]>>
- readBulkloadRows(List<TableName> tableList) throws IOException {
-
- Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map
= new HashMap<>();
- List<byte[]> rows = new ArrayList<>();
- for (TableName tTable : tableList) {
- Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
- Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap =
map.get(tTable);
- try (Table table = connection.getTable(bulkLoadTableName);
- ResultScanner scanner = table.getScanner(scan)) {
- Result res = null;
+ */
+ public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws
IOException {
+ List<BulkLoad> result = new ArrayList<>();
+ for (TableName table : tableList) {
+ Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
+ try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
+ ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
+ Result res;
while ((res = scanner.next()) != null) {
res.advance();
String fam = null;
String path = null;
- boolean raw = false;
- byte[] row;
String region = null;
+ byte[] row = null;
for (Cell cell : res.listCells()) {
row = CellUtil.cloneRow(cell);
- rows.add(row);
String rowStr = Bytes.toString(row);
region =
BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
if (
@@ -498,35 +464,14 @@ public final class BackupSystemTable implements Closeable
{
BackupSystemTable.PATH_COL.length) == 0
) {
path = Bytes.toString(CellUtil.cloneValue(cell));
- } else if (
- CellUtil.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
- BackupSystemTable.STATE_COL.length) == 0
- ) {
- byte[] state = CellUtil.cloneValue(cell);
- if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
- raw = true;
- } else {
- raw = false;
- }
}
}
- if (map.get(tTable) == null) {
- map.put(tTable, new HashMap<>());
- tblMap = map.get(tTable);
- }
- if (tblMap.get(region) == null) {
- tblMap.put(region, new HashMap<>());
- }
- Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
- if (famMap.get(fam) == null) {
- famMap.put(fam, new ArrayList<>());
- }
- famMap.get(fam).add(new Pair<>(path, raw));
+ result.add(new BulkLoad(table, region, fam, path, row));
LOG.debug("found orig " + path + " for " + fam + " of table " +
region);
}
}
}
- return new Pair<>(map, rows);
+ return result;
}
/*
@@ -793,20 +738,19 @@ public final class BackupSystemTable implements Closeable
{
return result;
}
- /*
- * Retrieve TableName's for completed backup of given type
- * @param type backup type
- * @return List of table names
+ /**
+ * Retrieve all table names that are part of any known backup
*/
- public List<TableName> getTablesForBackupType(BackupType type) throws
IOException {
+ public Set<TableName> getTablesIncludedInBackups() throws IOException {
Set<TableName> names = new HashSet<>();
List<BackupInfo> infos = getBackupHistory(true);
for (BackupInfo info : infos) {
- if (info.getType() == type) {
+ // Incremental backups have the same tables as the preceding full backups
+ if (info.getType() == BackupType.FULL) {
names.addAll(info.getTableNames());
}
}
- return new ArrayList<>(names);
+ return names;
}
/**
@@ -1500,13 +1444,13 @@ public final class BackupSystemTable implements
Closeable {
return s.substring(index + 1);
}
- /*
- * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
+ /**
+ * Creates Put's for bulk loads.
*/
- static List<Put> createPutForCommittedBulkload(TableName table, byte[]
region,
- Map<byte[], List<Path>> finalPaths) {
+ private static List<Put> createPutForBulkLoad(TableName table, byte[] region,
+ Map<byte[], List<Path>> columnFamilyToHFilePaths) {
List<Put> puts = new ArrayList<>();
- for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
+ for (Map.Entry<byte[], List<Path>> entry :
columnFamilyToHFilePaths.entrySet()) {
for (Path path : entry.getValue()) {
String file = path.toString();
int lastSlash = file.lastIndexOf("/");
@@ -1516,10 +1460,8 @@ public final class BackupSystemTable implements
Closeable {
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
Bytes.toBytes(file));
- put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
puts.add(put);
- LOG
- .debug("writing done bulk path " + file + " for " + table + " " +
Bytes.toString(region));
+ LOG.debug("Done writing bulk path {} for {} {}", file, table,
Bytes.toString(region));
}
}
return puts;
@@ -1580,29 +1522,6 @@ public final class BackupSystemTable implements
Closeable {
}
}
- /*
- * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
- */
- static List<Put> createPutForPreparedBulkload(TableName table, byte[]
region, final byte[] family,
- final List<Pair<Path, Path>> pairs) {
- List<Put> puts = new ArrayList<>(pairs.size());
- for (Pair<Path, Path> pair : pairs) {
- Path path = pair.getSecond();
- String file = path.toString();
- int lastSlash = file.lastIndexOf("/");
- String filename = file.substring(lastSlash + 1);
- Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(),
BLK_LD_DELIM,
- Bytes.toString(region), BLK_LD_DELIM, filename));
- put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
- put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
- put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
Bytes.toBytes(file));
- put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
- puts.add(put);
- LOG.debug("writing raw bulk path " + file + " for " + table + " " +
Bytes.toString(region));
- }
- return puts;
- }
-
public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
List<Delete> lstDels = new ArrayList<>(lst.size());
for (TableName table : lst) {
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java
new file mode 100644
index 00000000000..0f1e79c976b
--- /dev/null
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The data corresponding to a single bulk-loaded file that is being tracked
by the backup logic.
+ */
[email protected]
+public class BulkLoad {
+ private final TableName tableName;
+ private final String region;
+ private final String columnFamily;
+ private final String hfilePath;
+ private final byte[] rowKey;
+
+ public BulkLoad(TableName tableName, String region, String columnFamily,
String hfilePath,
+ byte[] rowKey) {
+ this.tableName = tableName;
+ this.region = region;
+ this.columnFamily = columnFamily;
+ this.hfilePath = hfilePath;
+ this.rowKey = rowKey;
+ }
+
+ public TableName getTableName() {
+ return tableName;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ public String getHfilePath() {
+ return hfilePath;
+ }
+
+ public byte[] getRowKey() {
+ return rowKey;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof BulkLoad)) {
+ return false;
+ }
+ BulkLoad that = (BulkLoad) o;
+ return new EqualsBuilder().append(tableName,
that.tableName).append(region, that.region)
+ .append(columnFamily, that.columnFamily).append(hfilePath,
that.hfilePath)
+ .append(rowKey, that.rowKey).isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new
HashCodeBuilder().append(tableName).append(region).append(columnFamily)
+ .append(hfilePath).append(rowKey).toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this, ToStringStyle.NO_CLASS_NAME_STYLE)
+ .append("tableName", tableName).append("region",
region).append("columnFamily", columnFamily)
+ .append("hfilePath", hfilePath).append("rowKey", rowKey).toString();
+ }
+}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 6ad487337c2..03a6ecc02f3 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
+import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -48,16 +48,16 @@ import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.util.Tool;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
/**
@@ -110,22 +110,17 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
return -1;
}
- /*
+ /**
* Reads bulk load records from backup table, iterates through the records
and forms the paths for
* bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination.
This method does NOT
* clean up the entries in the bulk load system table. Those entries should
not be cleaned until
* the backup is marked as complete.
- * @param sTableList list of tables to be backed up
- * @return the rowkeys of bulk loaded files
+ * @param tablesToBackup list of tables to be backed up
*/
- @SuppressWarnings("unchecked")
- protected List<byte[]> handleBulkLoad(List<TableName> sTableList) throws
IOException {
- Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
+ protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup)
throws IOException {
List<String> activeFiles = new ArrayList<>();
List<String> archiveFiles = new ArrayList<>();
- Pair<Map<TableName, Map<String, Map<String, List<Pair<String,
Boolean>>>>>, List<byte[]>> pair =
- backupManager.readBulkloadRows(sTableList);
- Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map
= pair.getFirst();
+ List<BulkLoad> bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
FileSystem tgtFs;
try {
tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
@@ -135,74 +130,46 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
Path rootdir = CommonFSUtils.getRootDir(conf);
Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
- for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String,
Boolean>>>>> tblEntry : map
- .entrySet()) {
- TableName srcTable = tblEntry.getKey();
+ for (BulkLoad bulkLoad : bulkLoads) {
+ TableName srcTable = bulkLoad.getTableName();
+ String regionName = bulkLoad.getRegion();
+ String fam = bulkLoad.getColumnFamily();
+ String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
- int srcIdx = getIndex(srcTable, sTableList);
- if (srcIdx < 0) {
- LOG.warn("Couldn't find " + srcTable + " in source table List");
+ if (!tablesToBackup.contains(srcTable)) {
+ LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
continue;
}
- if (mapForSrc[srcIdx] == null) {
- mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- }
Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
- Path tgtTable = new Path(new Path(tgtRoot,
srcTable.getNamespaceAsString()),
- srcTable.getQualifierAsString());
- for (Map.Entry<String, Map<String, List<Pair<String, Boolean>>>>
regionEntry : tblEntry
- .getValue().entrySet()) {
- String regionName = regionEntry.getKey();
- Path regionDir = new Path(tblDir, regionName);
- // map from family to List of hfiles
- for (Map.Entry<String, List<Pair<String, Boolean>>> famEntry :
regionEntry.getValue()
- .entrySet()) {
- String fam = famEntry.getKey();
- Path famDir = new Path(regionDir, fam);
- List<Path> files;
- if (!mapForSrc[srcIdx].containsKey(Bytes.toBytes(fam))) {
- files = new ArrayList<>();
- mapForSrc[srcIdx].put(Bytes.toBytes(fam), files);
- } else {
- files = mapForSrc[srcIdx].get(Bytes.toBytes(fam));
- }
- Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf,
srcTable, regionName, fam);
- String tblName = srcTable.getQualifierAsString();
- Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
- if (!tgtFs.mkdirs(tgtFam)) {
- throw new IOException("couldn't create " + tgtFam);
- }
- for (Pair<String, Boolean> fileWithState : famEntry.getValue()) {
- String file = fileWithState.getFirst();
- int idx = file.lastIndexOf("/");
- String filename = file;
- if (idx > 0) {
- filename = file.substring(idx + 1);
- }
- Path p = new Path(famDir, filename);
- Path tgt = new Path(tgtFam, filename);
- Path archive = new Path(archiveDir, filename);
- if (fs.exists(p)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("found bulk hfile " + file + " in " + famDir + " for
" + tblName);
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("copying " + p + " to " + tgt);
- }
- activeFiles.add(p.toString());
- } else if (fs.exists(archive)) {
- LOG.debug("copying archive " + archive + " to " + tgt);
- archiveFiles.add(archive.toString());
- }
- files.add(tgt);
- }
+ Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam +
Path.SEPARATOR + filename);
+
+ String srcTableQualifier = srcTable.getQualifierAsString();
+ String srcTableNs = srcTable.getNamespaceAsString();
+ Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR +
srcTableQualifier
+ + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
+ if (!tgtFs.mkdirs(tgtFam)) {
+ throw new IOException("couldn't create " + tgtFam);
+ }
+ Path tgt = new Path(tgtFam, filename);
+
+ Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable,
regionName, fam);
+ Path archive = new Path(archiveDir, filename);
+
+ if (fs.exists(p)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("found bulk hfile {} in {} for {}",
bulkLoad.getHfilePath(), p.getParent(),
+ srcTableQualifier);
+ LOG.trace("copying {} to {}", p, tgt);
}
+ activeFiles.add(p.toString());
+ } else if (fs.exists(archive)) {
+ LOG.debug("copying archive {} to {}", archive, tgt);
+ archiveFiles.add(archive.toString());
}
}
copyBulkLoadedFiles(activeFiles, archiveFiles);
-
- return pair.getSecond();
+ return bulkLoads;
}
private void copyBulkLoadedFiles(List<String> activeFiles, List<String>
archiveFiles)
@@ -327,11 +294,12 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
backupManager.writeBackupStartCode(newStartCode);
- List<byte[]> bulkLoadedRows = handleBulkLoad(backupInfo.getTableNames());
+ List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames());
// backup complete
completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf);
+ List<byte[]> bulkLoadedRows = Lists.transform(bulkLoads,
BulkLoad::getRowKey);
backupManager.deleteBulkLoadedRows(bulkLoadedRows);
} catch (IOException e) {
failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",