http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java new file mode 100644 index 0000000..b8adac9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java @@ -0,0 +1,666 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; + +/** + * Backup manifest contains all the meta data of a backup image. The manifest info will be bundled + * as manifest file together with data. So that each backup image will contain all the info needed + * for restore. BackupManifest is a storage container for BackupImage. + * It is responsible for storing/reading backup image data and has some additional utility methods. + * + */ +@InterfaceAudience.Private +public class BackupManifest { + + private static final Log LOG = LogFactory.getLog(BackupManifest.class); + + // manifest file name + public static final String MANIFEST_FILE_NAME = ".backup.manifest"; + + /** + * Backup image, the dependency graph is made up by series of backup images + * BackupImage contains all the relevant information to restore the backup and + * is used during restore operation + */ + + public static class BackupImage implements Comparable<BackupImage> { + + static class Builder { + BackupImage image; + + Builder() { + image = new BackupImage(); + } + + Builder withBackupId(String backupId) { + image.setBackupId(backupId); + return this; + } + + Builder withType(BackupType type) { + image.setType(type); + return this; + } + + Builder withRootDir(String rootDir) { + image.setRootDir(rootDir); + return this; + } + + Builder withTableList(List<TableName> tableList) { + image.setTableList(tableList); + return this; + } + + Builder withStartTime(long startTime) { + image.setStartTs(startTime); + return this; + } + + Builder withCompleteTime(long completeTime) { + image.setCompleteTs(completeTime); + return this; + } + + BackupImage build() { + return image; + } + + } + + private String backupId; + private BackupType type; + private String rootDir; + private List<TableName> tableList; + private long startTs; + private long completeTs; + private ArrayList<BackupImage> ancestors; + private HashMap<TableName, HashMap<String, Long>> incrTimeRanges; + + static Builder newBuilder() { + return new Builder(); + } + + public BackupImage() { + super(); + } + + private BackupImage(String backupId, BackupType type, String rootDir, + List<TableName> tableList, long startTs, long completeTs) { + this.backupId = backupId; + this.type = type; + this.rootDir = rootDir; + this.tableList = tableList; + this.startTs = startTs; + this.completeTs = completeTs; + } + + static BackupImage fromProto(BackupProtos.BackupImage im) { + String backupId = im.getBackupId(); + String rootDir = im.getBackupRootDir(); + long startTs = im.getStartTs(); + long completeTs = im.getCompleteTs(); + List<HBaseProtos.TableName> tableListList = im.getTableListList(); + List<TableName> tableList = new ArrayList<TableName>(); + for (HBaseProtos.TableName tn : tableListList) { + tableList.add(ProtobufUtil.toTableName(tn)); + } + + List<BackupProtos.BackupImage> ancestorList = im.getAncestorsList(); + + BackupType type = + im.getBackupType() == BackupProtos.BackupType.FULL ? BackupType.FULL + : BackupType.INCREMENTAL; + + BackupImage image = new BackupImage(backupId, type, rootDir, tableList, startTs, completeTs); + for (BackupProtos.BackupImage img : ancestorList) { + image.addAncestor(fromProto(img)); + } + image.setIncrTimeRanges(loadIncrementalTimestampMap(im)); + return image; + } + + BackupProtos.BackupImage toProto() { + BackupProtos.BackupImage.Builder builder = BackupProtos.BackupImage.newBuilder(); + builder.setBackupId(backupId); + builder.setCompleteTs(completeTs); + builder.setStartTs(startTs); + builder.setBackupRootDir(rootDir); + if (type == BackupType.FULL) { + builder.setBackupType(BackupProtos.BackupType.FULL); + } else { + builder.setBackupType(BackupProtos.BackupType.INCREMENTAL); + } + + for (TableName name : tableList) { + builder.addTableList(ProtobufUtil.toProtoTableName(name)); + } + + if (ancestors != null) { + for (BackupImage im : ancestors) { + builder.addAncestors(im.toProto()); + } + } + + setIncrementalTimestampMap(builder); + return builder.build(); + } + + private static HashMap<TableName, HashMap<String, Long>> loadIncrementalTimestampMap( + BackupProtos.BackupImage proto) { + List<BackupProtos.TableServerTimestamp> list = proto.getTstMapList(); + + HashMap<TableName, HashMap<String, Long>> incrTimeRanges = + new HashMap<TableName, HashMap<String, Long>>(); + if (list == null || list.size() == 0) return incrTimeRanges; + for (BackupProtos.TableServerTimestamp tst : list) { + TableName tn = ProtobufUtil.toTableName(tst.getTableName()); + HashMap<String, Long> map = incrTimeRanges.get(tn); + if (map == null) { + map = new HashMap<String, Long>(); + incrTimeRanges.put(tn, map); + } + List<BackupProtos.ServerTimestamp> listSt = tst.getServerTimestampList(); + for (BackupProtos.ServerTimestamp stm : listSt) { + ServerName sn = ProtobufUtil.toServerName(stm.getServerName()); + map.put(sn.getHostname() + ":" + sn.getPort(), stm.getTimestamp()); + } + } + return incrTimeRanges; + } + + private void setIncrementalTimestampMap(BackupProtos.BackupImage.Builder builder) { + if (this.incrTimeRanges == null) { + return; + } + for (Entry<TableName, HashMap<String, Long>> entry : this.incrTimeRanges.entrySet()) { + TableName key = entry.getKey(); + HashMap<String, Long> value = entry.getValue(); + BackupProtos.TableServerTimestamp.Builder tstBuilder = + BackupProtos.TableServerTimestamp.newBuilder(); + tstBuilder.setTableName(ProtobufUtil.toProtoTableName(key)); + + for (Map.Entry<String, Long> entry2 : value.entrySet()) { + String s = entry2.getKey(); + BackupProtos.ServerTimestamp.Builder stBuilder = + BackupProtos.ServerTimestamp.newBuilder(); + HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); + ServerName sn = ServerName.parseServerName(s); + snBuilder.setHostName(sn.getHostname()); + snBuilder.setPort(sn.getPort()); + stBuilder.setServerName(snBuilder.build()); + stBuilder.setTimestamp(entry2.getValue()); + tstBuilder.addServerTimestamp(stBuilder.build()); + } + builder.addTstMap(tstBuilder.build()); + } + } + + public String getBackupId() { + return backupId; + } + + private void setBackupId(String backupId) { + this.backupId = backupId; + } + + public BackupType getType() { + return type; + } + + private void setType(BackupType type) { + this.type = type; + } + + public String getRootDir() { + return rootDir; + } + + private void setRootDir(String rootDir) { + this.rootDir = rootDir; + } + + public List<TableName> getTableNames() { + return tableList; + } + + private void setTableList(List<TableName> tableList) { + this.tableList = tableList; + } + + public long getStartTs() { + return startTs; + } + + private void setStartTs(long startTs) { + this.startTs = startTs; + } + + public long getCompleteTs() { + return completeTs; + } + + private void setCompleteTs(long completeTs) { + this.completeTs = completeTs; + } + + public ArrayList<BackupImage> getAncestors() { + if (this.ancestors == null) { + this.ancestors = new ArrayList<BackupImage>(); + } + return this.ancestors; + } + + private void addAncestor(BackupImage backupImage) { + this.getAncestors().add(backupImage); + } + + public boolean hasAncestor(String token) { + for (BackupImage image : this.getAncestors()) { + if (image.getBackupId().equals(token)) { + return true; + } + } + return false; + } + + public boolean hasTable(TableName table) { + return tableList.contains(table); + } + + @Override + public int compareTo(BackupImage other) { + String thisBackupId = this.getBackupId(); + String otherBackupId = other.getBackupId(); + int index1 = thisBackupId.lastIndexOf("_"); + int index2 = otherBackupId.lastIndexOf("_"); + String name1 = thisBackupId.substring(0, index1); + String name2 = otherBackupId.substring(0, index2); + if (name1.equals(name2)) { + Long thisTS = Long.valueOf(thisBackupId.substring(index1 + 1)); + Long otherTS = Long.valueOf(otherBackupId.substring(index2 + 1)); + return thisTS.compareTo(otherTS); + } else { + return name1.compareTo(name2); + } + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof BackupImage) { + return this.compareTo((BackupImage) obj) == 0; + } + return false; + } + + @Override + public int hashCode() { + int hash = 33 * this.getBackupId().hashCode() + type.hashCode(); + hash = 33 * hash + rootDir.hashCode(); + hash = 33 * hash + Long.valueOf(startTs).hashCode(); + hash = 33 * hash + Long.valueOf(completeTs).hashCode(); + for (TableName table : tableList) { + hash = 33 * hash + table.hashCode(); + } + return hash; + } + + public HashMap<TableName, HashMap<String, Long>> getIncrTimeRanges() { + return incrTimeRanges; + } + + private void setIncrTimeRanges(HashMap<TableName, HashMap<String, Long>> incrTimeRanges) { + this.incrTimeRanges = incrTimeRanges; + } + } + + // backup image directory + private String tableBackupDir = null; + private BackupImage backupImage; + + /** + * Construct manifest for a ongoing backup. + * @param backup The ongoing backup info + */ + public BackupManifest(BackupInfo backup) { + + BackupImage.Builder builder = BackupImage.newBuilder(); + this.backupImage = + builder.withBackupId(backup.getBackupId()).withType(backup.getType()) + .withRootDir(backup.getBackupRootDir()).withTableList(backup.getTableNames()) + .withStartTime(backup.getStartTs()).withCompleteTime(backup.getCompleteTs()).build(); + } + + /** + * Construct a table level manifest for a backup of the named table. + * @param backup The ongoing backup session info + */ + public BackupManifest(BackupInfo backup, TableName table) { + this.tableBackupDir = backup.getTableBackupDir(table); + List<TableName> tables = new ArrayList<TableName>(); + tables.add(table); + BackupImage.Builder builder = BackupImage.newBuilder(); + this.backupImage = + builder.withBackupId(backup.getBackupId()).withType(backup.getType()) + .withRootDir(backup.getBackupRootDir()).withTableList(tables) + .withStartTime(backup.getStartTs()).withCompleteTime(backup.getCompleteTs()).build(); + } + + /** + * Construct manifest from a backup directory. + * @param conf configuration + * @param backupPath backup path + * @throws IOException + */ + + public BackupManifest(Configuration conf, Path backupPath) throws IOException { + this(backupPath.getFileSystem(conf), backupPath); + } + + /** + * Construct manifest from a backup directory. + * @param fs the FileSystem + * @param backupPath backup path + * @throws BackupException exception + */ + + public BackupManifest(FileSystem fs, Path backupPath) throws BackupException { + if (LOG.isDebugEnabled()) { + LOG.debug("Loading manifest from: " + backupPath.toString()); + } + // The input backupDir may not exactly be the backup table dir. + // It could be the backup log dir where there is also a manifest file stored. + // This variable's purpose is to keep the correct and original location so + // that we can store/persist it. + try { + + FileStatus[] subFiles = BackupUtils.listStatus(fs, backupPath, null); + if (subFiles == null) { + String errorMsg = backupPath.toString() + " does not exist"; + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + for (FileStatus subFile : subFiles) { + if (subFile.getPath().getName().equals(MANIFEST_FILE_NAME)) { + + // load and set manifest field from file content + FSDataInputStream in = fs.open(subFile.getPath()); + long len = subFile.getLen(); + byte[] pbBytes = new byte[(int) len]; + in.readFully(pbBytes); + BackupProtos.BackupImage proto = null; + try { + proto = BackupProtos.BackupImage.parseFrom(pbBytes); + } catch (Exception e) { + throw new BackupException(e); + } + this.backupImage = BackupImage.fromProto(proto); + LOG.debug("Loaded manifest instance from manifest file: " + + BackupUtils.getPath(subFile.getPath())); + return; + } + } + String errorMsg = "No manifest file found in: " + backupPath.toString(); + throw new IOException(errorMsg); + + } catch (IOException e) { + throw new BackupException(e.getMessage()); + } + } + + public BackupType getType() { + return backupImage.getType(); + } + + /** + * Get the table set of this image. + * @return The table set list + */ + public List<TableName> getTableList() { + return backupImage.getTableNames(); + } + + /** + * Persist the manifest file. + * @throws IOException IOException when storing the manifest file. + */ + + public void store(Configuration conf) throws BackupException { + byte[] data = backupImage.toProto().toByteArray(); + // write the file, overwrite if already exist + String logBackupDir = + BackupUtils.getLogBackupDir(backupImage.getRootDir(), backupImage.getBackupId()); + Path manifestFilePath = + new Path(new Path((tableBackupDir != null ? tableBackupDir : logBackupDir)), + MANIFEST_FILE_NAME); + try (FSDataOutputStream out = + manifestFilePath.getFileSystem(conf).create(manifestFilePath, true);) { + out.write(data); + } catch (IOException e) { + throw new BackupException(e.getMessage()); + } + + LOG.info("Manifest file stored to " + manifestFilePath); + } + + /** + * Get this backup image. + * @return the backup image. + */ + public BackupImage getBackupImage() { + return backupImage; + } + + /** + * Add dependent backup image for this backup. + * @param image The direct dependent backup image + */ + public void addDependentImage(BackupImage image) { + this.backupImage.addAncestor(image); + } + + /** + * Set the incremental timestamp map directly. + * @param incrTimestampMap timestamp map + */ + public void setIncrTimestampMap(HashMap<TableName, HashMap<String, Long>> incrTimestampMap) { + this.backupImage.setIncrTimeRanges(incrTimestampMap); + } + + public Map<TableName, HashMap<String, Long>> getIncrTimestampMap() { + return backupImage.getIncrTimeRanges(); + } + + /** + * Get the image list of this backup for restore in time order. + * @param reverse If true, then output in reverse order, otherwise in time order from old to new + * @return the backup image list for restore in time order + */ + public ArrayList<BackupImage> getRestoreDependentList(boolean reverse) { + TreeMap<Long, BackupImage> restoreImages = new TreeMap<Long, BackupImage>(); + restoreImages.put(backupImage.startTs, backupImage); + for (BackupImage image : backupImage.getAncestors()) { + restoreImages.put(Long.valueOf(image.startTs), image); + } + return new ArrayList<BackupImage>(reverse ? (restoreImages.descendingMap().values()) + : (restoreImages.values())); + } + + /** + * Get the dependent image list for a specific table of this backup in time order from old to new + * if want to restore to this backup image level. + * @param table table + * @return the backup image list for a table in time order + */ + public ArrayList<BackupImage> getDependentListByTable(TableName table) { + ArrayList<BackupImage> tableImageList = new ArrayList<BackupImage>(); + ArrayList<BackupImage> imageList = getRestoreDependentList(true); + for (BackupImage image : imageList) { + if (image.hasTable(table)) { + tableImageList.add(image); + if (image.getType() == BackupType.FULL) { + break; + } + } + } + Collections.reverse(tableImageList); + return tableImageList; + } + + /** + * Get the full dependent image list in the whole dependency scope for a specific table of this + * backup in time order from old to new. + * @param table table + * @return the full backup image list for a table in time order in the whole scope of the + * dependency of this image + */ + public ArrayList<BackupImage> getAllDependentListByTable(TableName table) { + ArrayList<BackupImage> tableImageList = new ArrayList<BackupImage>(); + ArrayList<BackupImage> imageList = getRestoreDependentList(false); + for (BackupImage image : imageList) { + if (image.hasTable(table)) { + tableImageList.add(image); + } + } + return tableImageList; + } + + /** + * Check whether backup image1 could cover backup image2 or not. + * @param image1 backup image 1 + * @param image2 backup image 2 + * @return true if image1 can cover image2, otherwise false + */ + public static boolean canCoverImage(BackupImage image1, BackupImage image2) { + // image1 can cover image2 only when the following conditions are satisfied: + // - image1 must not be an incremental image; + // - image1 must be taken after image2 has been taken; + // - table set of image1 must cover the table set of image2. + if (image1.getType() == BackupType.INCREMENTAL) { + return false; + } + if (image1.getStartTs() < image2.getStartTs()) { + return false; + } + List<TableName> image1TableList = image1.getTableNames(); + List<TableName> image2TableList = image2.getTableNames(); + boolean found = false; + for (int i = 0; i < image2TableList.size(); i++) { + found = false; + for (int j = 0; j < image1TableList.size(); j++) { + if (image2TableList.get(i).equals(image1TableList.get(j))) { + found = true; + break; + } + } + if (!found) { + return false; + } + } + + LOG.debug("Backup image " + image1.getBackupId() + " can cover " + image2.getBackupId()); + return true; + } + + /** + * Check whether backup image set could cover a backup image or not. + * @param fullImages The backup image set + * @param image The target backup image + * @return true if fullImages can cover image, otherwise false + */ + public static boolean canCoverImage(ArrayList<BackupImage> fullImages, BackupImage image) { + // fullImages can cover image only when the following conditions are satisfied: + // - each image of fullImages must not be an incremental image; + // - each image of fullImages must be taken after image has been taken; + // - sum table set of fullImages must cover the table set of image. + for (BackupImage image1 : fullImages) { + if (image1.getType() == BackupType.INCREMENTAL) { + return false; + } + if (image1.getStartTs() < image.getStartTs()) { + return false; + } + } + + ArrayList<String> image1TableList = new ArrayList<String>(); + for (BackupImage image1 : fullImages) { + List<TableName> tableList = image1.getTableNames(); + for (TableName table : tableList) { + image1TableList.add(table.getNameAsString()); + } + } + ArrayList<String> image2TableList = new ArrayList<String>(); + List<TableName> tableList = image.getTableNames(); + for (TableName table : tableList) { + image2TableList.add(table.getNameAsString()); + } + + for (int i = 0; i < image2TableList.size(); i++) { + if (image1TableList.contains(image2TableList.get(i)) == false) { + return false; + } + } + + LOG.debug("Full image set can cover image " + image.getBackupId()); + return true; + } + + public BackupInfo toBackupInfo() { + BackupInfo info = new BackupInfo(); + info.setType(backupImage.getType()); + List<TableName> list = backupImage.getTableNames(); + TableName[] tables = new TableName[list.size()]; + info.addTables(list.toArray(tables)); + info.setBackupId(backupImage.getBackupId()); + info.setStartTs(backupImage.getStartTs()); + info.setBackupRootDir(backupImage.getRootDir()); + if (backupImage.getType() == BackupType.INCREMENTAL) { + info.setHLogTargetDir(BackupUtils.getLogBackupDir(backupImage.getRootDir(), + backupImage.getBackupId())); + } + return info; + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java new file mode 100644 index 0000000..6362f8e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -0,0 +1,1376 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * This class provides API to access backup system table<br> + * + * Backup system table schema:<br> + * <p><ul> + * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> + * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> + * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li> + * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; + * value = map[RS-> last WAL timestamp]</li> + * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> + * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; + * value = backupId and full WAL file name</li> + * </ul></p> + */ +@InterfaceAudience.Private +public final class BackupSystemTable implements Closeable { + + static class WALItem { + String backupId; + String walFile; + String backupRoot; + + WALItem(String backupId, String walFile, String backupRoot) { + this.backupId = backupId; + this.walFile = walFile; + this.backupRoot = backupRoot; + } + + public String getBackupId() { + return backupId; + } + + public String getWalFile() { + return walFile; + } + + public String getBackupRoot() { + return backupRoot; + } + + @Override + public String toString() { + return Path.SEPARATOR + backupRoot + Path.SEPARATOR + backupId + Path.SEPARATOR + walFile; + } + + } + + private static final Log LOG = LogFactory.getLog(BackupSystemTable.class); + + private TableName tableName; + /** + * Stores backup sessions (contexts) + */ + final static byte[] SESSIONS_FAMILY = "session".getBytes(); + /** + * Stores other meta + */ + final static byte[] META_FAMILY = "meta".getBytes(); + /** + * Connection to HBase cluster, shared among all instances + */ + private final Connection connection; + + + private final static String BACKUP_INFO_PREFIX = "session:"; + private final static String START_CODE_ROW = "startcode:"; + private final static String INCR_BACKUP_SET = "incrbackupset:"; + private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; + private final static String RS_LOG_TS_PREFIX = "rslogts:"; + private final static String WALS_PREFIX = "wals:"; + private final static String SET_KEY_PREFIX = "backupset:"; + + private final static byte[] EMPTY_VALUE = new byte[] {}; + + // Safe delimiter in a string + private final static String NULL = "\u0000"; + + public BackupSystemTable(Connection conn) throws IOException { + this.connection = conn; + tableName = BackupSystemTable.getTableName(conn.getConfiguration()); + checkSystemTable(); + } + + private void checkSystemTable() throws IOException { + try (Admin admin = connection.getAdmin();) { + + if (!admin.tableExists(tableName)) { + HTableDescriptor backupHTD = + BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration()); + admin.createTable(backupHTD); + } + waitForSystemTable(admin); + } + } + + private void waitForSystemTable(Admin admin) throws IOException { + long TIMEOUT = 60000; + long startTime = EnvironmentEdgeManager.currentTime(); + while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { + throw new IOException("Failed to create backup system table after "+ TIMEOUT+"ms"); + } + } + LOG.debug("Backup table exists and available"); + + } + + + + @Override + public void close() { + // do nothing + } + + /** + * Updates status (state) of a backup session in backup system table table + * @param info backup info + * @throws IOException exception + */ + public void updateBackupInfo(BackupInfo info) throws IOException { + + if (LOG.isTraceEnabled()) { + LOG.trace("update backup status in backup system table for: " + info.getBackupId() + + " set status=" + info.getState()); + } + try (Table table = connection.getTable(tableName)) { + Put put = createPutForBackupInfo(info); + table.put(put); + } + } + + /** + * Deletes backup status from backup system table table + * @param backupId backup id + * @throws IOException exception + */ + + public void deleteBackupInfo(String backupId) throws IOException { + + if (LOG.isTraceEnabled()) { + LOG.trace("delete backup status in backup system table for " + backupId); + } + try (Table table = connection.getTable(tableName)) { + Delete del = createDeleteForBackupInfo(backupId); + table.delete(del); + } + } + + /** + * Reads backup status object (instance of backup info) from backup system table table + * @param backupId backup id + * @return Current status of backup session or null + */ + + public BackupInfo readBackupInfo(String backupId) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("read backup status from backup system table for: " + backupId); + } + + try (Table table = connection.getTable(tableName)) { + Get get = createGetForBackupInfo(backupId); + Result res = table.get(get); + if (res.isEmpty()) { + return null; + } + return resultToBackupInfo(res); + } + } + + /** + * Read the last backup start code (timestamp) of last successful backup. Will return null if + * there is no start code stored on hbase or the value is of length 0. These two cases indicate + * there is no successful backup completed so far. + * @param backupRoot directory path to backup destination + * @return the timestamp of last successful backup + * @throws IOException exception + */ + public String readBackupStartCode(String backupRoot) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("read backup start code from backup system table"); + } + try (Table table = connection.getTable(tableName)) { + Get get = createGetForStartCode(backupRoot); + Result res = table.get(get); + if (res.isEmpty()) { + return null; + } + Cell cell = res.listCells().get(0); + byte[] val = CellUtil.cloneValue(cell); + if (val.length == 0) { + return null; + } + return new String(val); + } + } + + /** + * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. + * @param startCode start code + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("write backup start code to backup system table " + startCode); + } + try (Table table = connection.getTable(tableName)) { + Put put = createPutForStartCode(startCode.toString(), backupRoot); + table.put(put); + } + } + + /** + * Get the Region Servers log information after the last log roll from backup system table. + * @param backupRoot root directory path to backup + * @return RS log info + * @throws IOException exception + */ + public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot) + throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("read region server last roll log result to backup system table"); + } + + Scan scan = createScanForReadRegionServerLastLogRollResult(backupRoot); + + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + HashMap<String, Long> rsTimestampMap = new HashMap<String, Long>(); + while ((res = scanner.next()) != null) { + res.advance(); + Cell cell = res.current(); + byte[] row = CellUtil.cloneRow(cell); + String server = + getServerNameForReadRegionServerLastLogRollResult(row); + byte[] data = CellUtil.cloneValue(cell); + rsTimestampMap.put(server, Bytes.toLong(data)); + } + return rsTimestampMap; + } + } + + /** + * Writes Region Server last roll log result (timestamp) to backup system table table + * @param server Region Server name + * @param ts last log timestamp + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot) + throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("write region server last roll log result to backup system table"); + } + try (Table table = connection.getTable(tableName)) { + Put put = + createPutForRegionServerLastLogRollResult(server, ts, backupRoot); + table.put(put); + } + } + + /** + * Get all completed backup information (in desc order by time) + * @param onlyCompleted true, if only successfully completed sessions + * @return history info of BackupCompleteData + * @throws IOException exception + */ + public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("get backup history from backup system table"); + } + ArrayList<BackupInfo> list; + BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY; + list = getBackupInfos(state); + return BackupUtils.sortHistoryListDesc(list); + } + + /** + * Get all backups history + * @return list of backup info + * @throws IOException + */ + public List<BackupInfo> getBackupHistory() throws IOException { + return getBackupHistory(false); + } + + /** + * Get first n backup history records + * @param n number of records + * @return list of records + * @throws IOException + */ + public List<BackupInfo> getHistory(int n) throws IOException { + + List<BackupInfo> history = getBackupHistory(); + if (history.size() <= n) return history; + List<BackupInfo> list = new ArrayList<BackupInfo>(); + for (int i = 0; i < n; i++) { + list.add(history.get(i)); + } + return list; + + } + + /** + * Get backup history records filtered by list of filters. + * @param n max number of records + * @param filters list of filters + * @return backup records + * @throws IOException + */ + public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException { + if (filters.length == 0) return getHistory(n); + + List<BackupInfo> history = getBackupHistory(); + List<BackupInfo> result = new ArrayList<BackupInfo>(); + for (BackupInfo bi : history) { + if (result.size() == n) break; + boolean passed = true; + for (int i = 0; i < filters.length; i++) { + if (!filters[i].apply(bi)) { + passed = false; + break; + } + } + if (passed) { + result.add(bi); + } + } + return result; + + } + + /** + * Get history for backup destination + * @param backupRoot backup destination path + * @return List of backup info + * @throws IOException + */ + public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException { + ArrayList<BackupInfo> history = getBackupHistory(false); + for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { + BackupInfo info = iterator.next(); + if (!backupRoot.equals(info.getBackupRootDir())) { + iterator.remove(); + } + } + return history; + } + + /** + * Get history for a table + * @param name table name + * @return history for a table + * @throws IOException + */ + public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException { + List<BackupInfo> history = getBackupHistory(); + List<BackupInfo> tableHistory = new ArrayList<BackupInfo>(); + for (BackupInfo info : history) { + List<TableName> tables = info.getTableNames(); + if (tables.contains(name)) { + tableHistory.add(info); + } + } + return tableHistory; + } + + public Map<TableName, ArrayList<BackupInfo>> getBackupHistoryForTableSet(Set<TableName> set, + String backupRoot) throws IOException { + List<BackupInfo> history = getBackupHistory(backupRoot); + Map<TableName, ArrayList<BackupInfo>> tableHistoryMap = + new HashMap<TableName, ArrayList<BackupInfo>>(); + for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) { + BackupInfo info = iterator.next(); + if (!backupRoot.equals(info.getBackupRootDir())) { + continue; + } + List<TableName> tables = info.getTableNames(); + for (TableName tableName : tables) { + if (set.contains(tableName)) { + ArrayList<BackupInfo> list = tableHistoryMap.get(tableName); + if (list == null) { + list = new ArrayList<BackupInfo>(); + tableHistoryMap.put(tableName, list); + } + list.add(info); + } + } + } + return tableHistoryMap; + } + + /** + * Get all backup sessions with a given state (in descending order by time) + * @param state backup session state + * @return history info of backup info objects + * @throws IOException exception + */ + public ArrayList<BackupInfo> getBackupInfos(BackupState state) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("get backup infos from backup system table"); + } + + Scan scan = createScanForBackupHistory(); + ArrayList<BackupInfo> list = new ArrayList<BackupInfo>(); + + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + BackupInfo context = cellToBackupInfo(res.current()); + if (state != BackupState.ANY && context.getState() != state) { + continue; + } + list.add(context); + } + return list; + } + } + + /** + * Write the current timestamps for each regionserver to backup system table after a successful + * full or incremental backup. The saved timestamp is of the last log file that was backed up + * already. + * @param tables tables + * @param newTimestamps timestamps + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public void writeRegionServerLogTimestamp(Set<TableName> tables, + HashMap<String, Long> newTimestamps, String backupRoot) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("write RS log time stamps to backup system table for tables [" + + StringUtils.join(tables, ",") + "]"); + } + List<Put> puts = new ArrayList<Put>(); + for (TableName table : tables) { + byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); + Put put = + createPutForWriteRegionServerLogTimestamp(table, smapData, + backupRoot); + puts.add(put); + } + try (Table table = connection.getTable(tableName)) { + table.put(puts); + } + } + + /** + * Read the timestamp for each region server log after the last successful backup. Each table has + * its own set of the timestamps. The info is stored for each table as a concatenated string of + * rs->timestapmp + * @param backupRoot root directory path to backup + * @return the timestamp for each region server. key: tableName value: + * RegionServer,PreviousTimeStamp + * @throws IOException exception + */ + public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String backupRoot) + throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("read RS log ts from backup system table for root=" + backupRoot); + } + + HashMap<TableName, HashMap<String, Long>> tableTimestampMap = + new HashMap<TableName, HashMap<String, Long>>(); + + Scan scan = createScanForReadLogTimestampMap(backupRoot); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + Cell cell = res.current(); + byte[] row = CellUtil.cloneRow(cell); + String tabName = getTableNameForReadLogTimestampMap(row); + TableName tn = TableName.valueOf(tabName); + byte[] data = CellUtil.cloneValue(cell); + if (data == null) { + throw new IOException("Data of last backup data from backup system table " + + "is empty. Create a backup first."); + } + if (data != null && data.length > 0) { + HashMap<String, Long> lastBackup = + fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data)); + tableTimestampMap.put(tn, lastBackup); + } + } + return tableTimestampMap; + } + } + + private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table, + Map<String, Long> map) { + BackupProtos.TableServerTimestamp.Builder tstBuilder = + BackupProtos.TableServerTimestamp.newBuilder(); + tstBuilder.setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil + .toProtoTableName(table)); + + for (Entry<String, Long> entry : map.entrySet()) { + BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); + HBaseProtos.ServerName.Builder snBuilder = HBaseProtos.ServerName.newBuilder(); + ServerName sn = ServerName.parseServerName(entry.getKey()); + snBuilder.setHostName(sn.getHostname()); + snBuilder.setPort(sn.getPort()); + builder.setServerName(snBuilder.build()); + builder.setTimestamp(entry.getValue()); + tstBuilder.addServerTimestamp(builder.build()); + } + + return tstBuilder.build(); + } + + private HashMap<String, Long> fromTableServerTimestampProto( + BackupProtos.TableServerTimestamp proto) { + HashMap<String, Long> map = new HashMap<String, Long>(); + List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); + for (BackupProtos.ServerTimestamp st : list) { + ServerName sn = + org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toServerName(st.getServerName()); + map.put(sn.getHostname() + ":" + sn.getPort(), st.getTimestamp()); + } + return map; + } + + /** + * Return the current tables covered by incremental backup. + * @param backupRoot root directory path to backup + * @return set of tableNames + * @throws IOException exception + */ + public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("get incremental backup table set from backup system table"); + } + TreeSet<TableName> set = new TreeSet<>(); + + try (Table table = connection.getTable(tableName)) { + Get get = createGetForIncrBackupTableSet(backupRoot); + Result res = table.get(get); + if (res.isEmpty()) { + return set; + } + List<Cell> cells = res.listCells(); + for (Cell cell : cells) { + // qualifier = table name - we use table names as qualifiers + set.add(TableName.valueOf(CellUtil.cloneQualifier(cell))); + } + return set; + } + } + + /** + * Add tables to global incremental backup set + * @param tables set of tables + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) + throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot + + " tables [" + StringUtils.join(tables, " ") + "]"); + for (TableName table : tables) { + LOG.debug(table); + } + } + try (Table table = connection.getTable(tableName)) { + Put put = createPutForIncrBackupTableSet(tables, backupRoot); + table.put(put); + } + } + + /** + * Deletes incremental backup set for a backup destination + * @param backupRoot backup root + */ + + public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Delete incremental backup table set to backup system table. ROOT=" + backupRoot); + } + try (Table table = connection.getTable(tableName)) { + Delete delete = createDeleteForIncrBackupTableSet(backupRoot); + table.delete(delete); + } + } + + /** + * Register WAL files as eligible for deletion + * @param files files + * @param backupId backup id + * @param backupRoot root directory path to backup destination + * @throws IOException exception + */ + public void addWALFiles(List<String> files, String backupId, String backupRoot) + throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("add WAL files to backup system table: " + backupId + " " + backupRoot + " files [" + + StringUtils.join(files, ",") + "]"); + for (String f : files) { + LOG.debug("add :" + f); + } + } + try (Table table = connection.getTable(tableName)) { + List<Put> puts = + createPutsForAddWALFiles(files, backupId, backupRoot); + table.put(puts); + } + } + + /** + * Register WAL files as eligible for deletion + * @param backupRoot root directory path to backup + * @throws IOException exception + */ + public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("get WAL files from backup system table"); + } + final Table table = connection.getTable(tableName); + Scan scan = createScanForGetWALs(backupRoot); + final ResultScanner scanner = table.getScanner(scan); + final Iterator<Result> it = scanner.iterator(); + return new Iterator<WALItem>() { + + @Override + public boolean hasNext() { + boolean next = it.hasNext(); + if (!next) { + // close all + try { + scanner.close(); + table.close(); + } catch (IOException e) { + LOG.error("Close WAL Iterator", e); + } + } + return next; + } + + @Override + public WALItem next() { + Result next = it.next(); + List<Cell> cells = next.listCells(); + byte[] buf = cells.get(0).getValueArray(); + int len = cells.get(0).getValueLength(); + int offset = cells.get(0).getValueOffset(); + String backupId = new String(buf, offset, len); + buf = cells.get(1).getValueArray(); + len = cells.get(1).getValueLength(); + offset = cells.get(1).getValueOffset(); + String walFile = new String(buf, offset, len); + buf = cells.get(2).getValueArray(); + len = cells.get(2).getValueLength(); + offset = cells.get(2).getValueOffset(); + String backupRoot = new String(buf, offset, len); + return new WALItem(backupId, walFile, backupRoot); + } + + @Override + public void remove() { + // not implemented + throw new RuntimeException("remove is not supported"); + } + }; + + } + + /** + * Check if WAL file is eligible for deletion Future: to support all backup destinations + * @param file name of a file to check + * @return true, if deletable, false otherwise. + * @throws IOException exception + */ + public boolean isWALFileDeletable(String file) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Check if WAL file has been already backed up in backup system table " + file); + } + try (Table table = connection.getTable(tableName)) { + Get get = createGetForCheckWALFile(file); + Result res = table.get(get); + if (res.isEmpty()) { + return false; + } + return true; + } + } + + /** + * Checks if we have at least one backup session in backup system table This API is used by + * BackupLogCleaner + * @return true, if - at least one session exists in backup system table table + * @throws IOException exception + */ + public boolean hasBackupSessions() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Has backup sessions from backup system table"); + } + boolean result = false; + Scan scan = createScanForBackupHistory(); + scan.setCaching(1); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + if (scanner.next() != null) { + result = true; + } + return result; + } + } + + /** + * BACKUP SETS + */ + + /** + * Get backup set list + * @return backup set list + * @throws IOException + */ + public List<String> listBackupSets() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace(" Backup set list"); + } + List<String> list = new ArrayList<String>(); + Table table = null; + ResultScanner scanner = null; + try { + table = connection.getTable(tableName); + Scan scan = createScanForBackupSetList(); + scan.setMaxVersions(1); + scanner = table.getScanner(scan); + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + list.add(cellKeyToBackupSetName(res.current())); + } + return list; + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + table.close(); + } + } + } + + /** + * Get backup set description (list of tables) + * @param name set's name + * @return list of tables in a backup set + * @throws IOException + */ + public List<TableName> describeBackupSet(String name) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace(" Backup set describe: " + name); + } + Table table = null; + try { + table = connection.getTable(tableName); + Get get = createGetForBackupSet(name); + Result res = table.get(get); + if (res.isEmpty()) return null; + res.advance(); + String[] tables = cellValueToBackupSet(res.current()); + return toList(tables); + } finally { + if (table != null) { + table.close(); + } + } + } + + private List<TableName> toList(String[] tables) { + List<TableName> list = new ArrayList<TableName>(tables.length); + for (String name : tables) { + list.add(TableName.valueOf(name)); + } + return list; + } + + /** + * Add backup set (list of tables) + * @param name set name + * @param newTables list of tables, comma-separated + * @throws IOException + */ + public void addToBackupSet(String name, String[] newTables) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]"); + } + Table table = null; + String[] union = null; + try { + table = connection.getTable(tableName); + Get get = createGetForBackupSet(name); + Result res = table.get(get); + if (res.isEmpty()) { + union = newTables; + } else { + res.advance(); + String[] tables = cellValueToBackupSet(res.current()); + union = merge(tables, newTables); + } + Put put = createPutForBackupSet(name, union); + table.put(put); + } finally { + if (table != null) { + table.close(); + } + } + } + + private String[] merge(String[] tables, String[] newTables) { + List<String> list = new ArrayList<String>(); + // Add all from tables + for (String t : tables) { + list.add(t); + } + for (String nt : newTables) { + if (list.contains(nt)) continue; + list.add(nt); + } + String[] arr = new String[list.size()]; + list.toArray(arr); + return arr; + } + + /** + * Remove tables from backup set (list of tables) + * @param name set name + * @param toRemove list of tables + * @throws IOException + */ + public void removeFromBackupSet(String name, String[] toRemove) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace(" Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + + "]"); + } + Table table = null; + String[] disjoint = null; + String[] tables = null; + try { + table = connection.getTable(tableName); + Get get = createGetForBackupSet(name); + Result res = table.get(get); + if (res.isEmpty()) { + LOG.warn("Backup set '" + name + "' not found."); + return; + } else { + res.advance(); + tables = cellValueToBackupSet(res.current()); + disjoint = disjoin(tables, toRemove); + } + if (disjoint.length > 0 && disjoint.length != tables.length) { + Put put = createPutForBackupSet(name, disjoint); + table.put(put); + } else if(disjoint.length == tables.length) { + LOG.warn("Backup set '" + name + "' does not contain tables [" + + StringUtils.join(toRemove, " ") + "]"); + } else { // disjoint.length == 0 and tables.length >0 + // Delete backup set + LOG.info("Backup set '"+name+"' is empty. Deleting."); + deleteBackupSet(name); + } + } finally { + if (table != null) { + table.close(); + } + } + } + + private String[] disjoin(String[] tables, String[] toRemove) { + List<String> list = new ArrayList<String>(); + // Add all from tables + for (String t : tables) { + list.add(t); + } + for (String nt : toRemove) { + if (list.contains(nt)) { + list.remove(nt); + } + } + String[] arr = new String[list.size()]; + list.toArray(arr); + return arr; + } + + /** + * Delete backup set + * @param name set's name + * @throws IOException + */ + public void deleteBackupSet(String name) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace(" Backup set delete: " + name); + } + Table table = null; + try { + table = connection.getTable(tableName); + Delete del = createDeleteForBackupSet(name); + table.delete(del); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Get backup system table descriptor + * @return table's descriptor + */ + public static HTableDescriptor getSystemTableDescriptor(Configuration conf) { + + HTableDescriptor tableDesc = new HTableDescriptor(getTableName(conf)); + HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY); + colSessionsDesc.setMaxVersions(1); + // Time to keep backup sessions (secs) + Configuration config = HBaseConfiguration.create(); + int ttl = + config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, + BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); + colSessionsDesc.setTimeToLive(ttl); + tableDesc.addFamily(colSessionsDesc); + HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY); + tableDesc.addFamily(colMetaDesc); + return tableDesc; + } + + public static TableName getTableName(Configuration conf) { + String name = + conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, + BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); + return TableName.valueOf(name); + } + + public static String getTableNameAsString(Configuration conf) { + return getTableName(conf).getNameAsString(); + } + + + + + + /** + * Creates Put operation for a given backup info object + * @param context backup info + * @return put operation + * @throws IOException exception + */ + private Put createPutForBackupInfo(BackupInfo context) throws IOException { + Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId())); + put.addColumn(BackupSystemTable.SESSIONS_FAMILY, Bytes.toBytes("context"), + context.toByteArray()); + return put; + } + + /** + * Creates Get operation for a given backup id + * @param backupId backup's ID + * @return get operation + * @throws IOException exception + */ + private Get createGetForBackupInfo(String backupId) throws IOException { + Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId)); + get.addFamily(BackupSystemTable.SESSIONS_FAMILY); + get.setMaxVersions(1); + return get; + } + + /** + * Creates Delete operation for a given backup id + * @param backupId backup's ID + * @return delete operation + * @throws IOException exception + */ + private Delete createDeleteForBackupInfo(String backupId) { + Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId)); + del.addFamily(BackupSystemTable.SESSIONS_FAMILY); + return del; + } + + /** + * Converts Result to BackupInfo + * @param res HBase result + * @return backup info instance + * @throws IOException exception + */ + private BackupInfo resultToBackupInfo(Result res) throws IOException { + res.advance(); + Cell cell = res.current(); + return cellToBackupInfo(cell); + } + + /** + * Creates Get operation to retrieve start code from backup system table + * @return get operation + * @throws IOException exception + */ + private Get createGetForStartCode(String rootPath) throws IOException { + Get get = new Get(rowkey(START_CODE_ROW, rootPath)); + get.addFamily(BackupSystemTable.META_FAMILY); + get.setMaxVersions(1); + return get; + } + + /** + * Creates Put operation to store start code to backup system table + * @return put operation + * @throws IOException exception + */ + private Put createPutForStartCode(String startCode, String rootPath) { + Put put = new Put(rowkey(START_CODE_ROW, rootPath)); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"), + Bytes.toBytes(startCode)); + return put; + } + + /** + * Creates Get to retrieve incremental backup table set from backup system table + * @return get operation + * @throws IOException exception + */ + private Get createGetForIncrBackupTableSet(String backupRoot) throws IOException { + Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot)); + get.addFamily(BackupSystemTable.META_FAMILY); + get.setMaxVersions(1); + return get; + } + + /** + * Creates Put to store incremental backup table set + * @param tables tables + * @return put operation + */ + private Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) { + Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot)); + for (TableName table : tables) { + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()), + EMPTY_VALUE); + } + return put; + } + + /** + * Creates Delete for incremental backup table set + * @param backupRoot backup root + * @return delete operation + */ + private Delete createDeleteForIncrBackupTableSet(String backupRoot) { + Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot)); + delete.addFamily(BackupSystemTable.META_FAMILY); + return delete; + } + + /** + * Creates Scan operation to load backup history + * @return scan operation + */ + private Scan createScanForBackupHistory() { + Scan scan = new Scan(); + byte[] startRow = Bytes.toBytes(BACKUP_INFO_PREFIX); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.SESSIONS_FAMILY); + scan.setMaxVersions(1); + return scan; + } + + /** + * Converts cell to backup info instance. + * @param current current cell + * @return backup backup info instance + * @throws IOException exception + */ + private BackupInfo cellToBackupInfo(Cell current) throws IOException { + byte[] data = CellUtil.cloneValue(current); + return BackupInfo.fromByteArray(data); + } + + /** + * Creates Put to write RS last roll log timestamp map + * @param table table + * @param smap map, containing RS:ts + * @return put operation + */ + private Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, + String backupRoot) { + Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString())); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("log-roll-map"), smap); + return put; + } + + /** + * Creates Scan to load table-> { RS -> ts} map of maps + * @return scan operation + */ + private Scan createScanForReadLogTimestampMap(String backupRoot) { + Scan scan = new Scan(); + byte[] startRow = rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + + return scan; + } + + /** + * Get table name from rowkey + * @param cloneRow rowkey + * @return table name + */ + private String getTableNameForReadLogTimestampMap(byte[] cloneRow) { + String s = Bytes.toString(cloneRow); + int index = s.lastIndexOf(NULL); + return s.substring(index + 1); + } + + /** + * Creates Put to store RS last log result + * @param server server name + * @param timestamp log roll result (timestamp) + * @return put operation + */ + private Put createPutForRegionServerLastLogRollResult(String server, Long timestamp, + String backupRoot) { + Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server)); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("rs-log-ts"), + Bytes.toBytes(timestamp)); + return put; + } + + /** + * Creates Scan operation to load last RS log roll results + * @return scan operation + */ + private Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) { + Scan scan = new Scan(); + byte[] startRow = rowkey(RS_LOG_TS_PREFIX, backupRoot); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + scan.setMaxVersions(1); + + return scan; + } + + /** + * Get server's name from rowkey + * @param row rowkey + * @return server's name + */ + private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { + String s = Bytes.toString(row); + int index = s.lastIndexOf(NULL); + return s.substring(index + 1); + } + + /** + * Creates put list for list of WAL files + * @param files list of WAL file paths + * @param backupId backup id + * @return put list + * @throws IOException exception + */ + private List<Put> createPutsForAddWALFiles(List<String> files, String backupId, + String backupRoot) throws IOException { + + List<Put> puts = new ArrayList<Put>(); + for (String file : files) { + Put put = new Put(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file))); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("backupId"), + Bytes.toBytes(backupId)); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("file"), Bytes.toBytes(file)); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("root"), Bytes.toBytes(backupRoot)); + puts.add(put); + } + return puts; + } + + /** + * Creates Scan operation to load WALs + * @param backupRoot path to backup destination + * @return scan operation + */ + private Scan createScanForGetWALs(String backupRoot) { + // TODO: support for backupRoot + Scan scan = new Scan(); + byte[] startRow = Bytes.toBytes(WALS_PREFIX); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + return scan; + } + + /** + * Creates Get operation for a given wal file name TODO: support for backup destination + * @param file file + * @return get operation + * @throws IOException exception + */ + private Get createGetForCheckWALFile(String file) throws IOException { + Get get = new Get(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file))); + // add backup root column + get.addFamily(BackupSystemTable.META_FAMILY); + return get; + } + + /** + * Creates Scan operation to load backup set list + * @return scan operation + */ + private Scan createScanForBackupSetList() { + Scan scan = new Scan(); + byte[] startRow = Bytes.toBytes(SET_KEY_PREFIX); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + return scan; + } + + /** + * Creates Get operation to load backup set content + * @return get operation + */ + private Get createGetForBackupSet(String name) { + Get get = new Get(rowkey(SET_KEY_PREFIX, name)); + get.addFamily(BackupSystemTable.META_FAMILY); + return get; + } + + /** + * Creates Delete operation to delete backup set content + * @param name backup set's name + * @return delete operation + */ + private Delete createDeleteForBackupSet(String name) { + Delete del = new Delete(rowkey(SET_KEY_PREFIX, name)); + del.addFamily(BackupSystemTable.META_FAMILY); + return del; + } + + /** + * Creates Put operation to update backup set content + * @param name backup set's name + * @param tables list of tables + * @return put operation + */ + private Put createPutForBackupSet(String name, String[] tables) { + Put put = new Put(rowkey(SET_KEY_PREFIX, name)); + byte[] value = convertToByteArray(tables); + put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("tables"), value); + return put; + } + + private byte[] convertToByteArray(String[] tables) { + return StringUtils.join(tables, ",").getBytes(); + } + + /** + * Converts cell to backup set list. + * @param current current cell + * @return backup set as array of table names + * @throws IOException + */ + private String[] cellValueToBackupSet(Cell current) throws IOException { + byte[] data = CellUtil.cloneValue(current); + if (data != null && data.length > 0) { + return Bytes.toString(data).split(","); + } else { + return new String[0]; + } + } + + /** + * Converts cell key to backup set name. + * @param current current cell + * @return backup set name + * @throws IOException + */ + private String cellKeyToBackupSetName(Cell current) throws IOException { + byte[] data = CellUtil.cloneRow(current); + return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); + } + + private byte[] rowkey(String s, String... other) { + StringBuilder sb = new StringBuilder(s); + for (String ss : other) { + sb.append(ss); + } + return sb.toString().getBytes(); + } + + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/75d0f49d/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java new file mode 100644 index 0000000..77d1184 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupCopyJob; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupRequest; +import org.apache.hadoop.hbase.backup.BackupRestoreFactory; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Full table backup implementation + * + */ +@InterfaceAudience.Private +public class FullTableBackupClient extends TableBackupClient { + private static final Log LOG = LogFactory.getLog(FullTableBackupClient.class); + + public FullTableBackupClient(final Connection conn, final String backupId, BackupRequest request) + throws IOException { + super(conn, backupId, request); + } + + /** + * Do snapshot copy. + * @param backupInfo backup info + * @throws Exception exception + */ + private void snapshotCopy(BackupInfo backupInfo) throws Exception { + LOG.info("Snapshot copy is starting."); + + // set overall backup phase: snapshot_copy + backupInfo.setPhase(BackupPhase.SNAPSHOTCOPY); + + // call ExportSnapshot to copy files based on hbase snapshot for backup + // ExportSnapshot only support single snapshot export, need loop for multiple tables case + BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); + + // number of snapshots matches number of tables + float numOfSnapshots = backupInfo.getSnapshotNames().size(); + + LOG.debug("There are " + (int) numOfSnapshots + " snapshots to be copied."); + + for (TableName table : backupInfo.getTables()) { + // Currently we simply set the sub copy tasks by counting the table snapshot number, we can + // calculate the real files' size for the percentage in the future. + // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots); + int res = 0; + String[] args = new String[4]; + args[0] = "-snapshot"; + args[1] = backupInfo.getSnapshotName(table); + args[2] = "-copy-to"; + args[3] = backupInfo.getTableBackupDir(table); + + LOG.debug("Copy snapshot " + args[1] + " to " + args[3]); + res = copyService.copy(backupInfo, backupManager, conf, BackupType.FULL, args); + // if one snapshot export failed, do not continue for remained snapshots + if (res != 0) { + LOG.error("Exporting Snapshot " + args[1] + " failed with return code: " + res + "."); + + throw new IOException("Failed of exporting snapshot " + args[1] + " to " + args[3] + + " with reason code " + res); + } + LOG.info("Snapshot copy " + args[1] + " finished."); + } + } + + /** + * Backup request execution + * @throws IOException + */ + @Override + public void execute() throws IOException { + + try (Admin admin = conn.getAdmin();) { + + // Begin BACKUP + beginBackup(backupManager, backupInfo); + String savedStartCode = null; + boolean firstBackup = false; + // do snapshot for full table backup + + savedStartCode = backupManager.readBackupStartCode(); + firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L; + if (firstBackup) { + // This is our first backup. Let's put some marker to system table so that we can hold the logs + // while we do the backup. + backupManager.writeBackupStartCode(0L); + } + // We roll log here before we do the snapshot. It is possible there is duplicate data + // in the log that is already in the snapshot. But if we do it after the snapshot, we + // could have data loss. + // A better approach is to do the roll log on each RS in the same global procedure as + // the snapshot. + LOG.info("Execute roll log procedure for full backup ..."); + + Map<String, String> props = new HashMap<String, String>(); + props.put("backupRoot", backupInfo.getBackupRootDir()); + admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, + LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); + + newTimestamps = backupManager.readRegionServerLastLogRollResult(); + if (firstBackup) { + // Updates registered log files + // We record ALL old WAL files as registered, because + // this is a first full backup in the system and these + // files are not needed for next incremental backup + List<String> logFiles = BackupUtils.getWALFilesOlderThan(conf, newTimestamps); + backupManager.recordWALFiles(logFiles); + } + + // SNAPSHOT_TABLES: + backupInfo.setPhase(BackupPhase.SNAPSHOT); + for (TableName tableName : tableList) { + String snapshotName = + "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime()) + "_" + + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString(); + + admin.snapshot(snapshotName, tableName); + + backupInfo.setSnapshotName(tableName, snapshotName); + } + + // SNAPSHOT_COPY: + // do snapshot copy + LOG.debug("snapshot copy for " + backupId); + snapshotCopy(backupInfo); + // Updates incremental backup table set + backupManager.addIncrementalBackupTableSet(backupInfo.getTables()); + + // BACKUP_COMPLETE: + // set overall backup status: complete. Here we make sure to complete the backup. + // After this checkpoint, even if entering cancel process, will let the backup finished + backupInfo.setState(BackupState.COMPLETE); + // The table list in backupInfo is good for both full backup and incremental backup. + // For incremental backup, it contains the incremental backup table set. + backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); + + HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap = + backupManager.readLogTimestampMap(); + + Long newStartCode = + BackupUtils.getMinValue(BackupUtils + .getRSLogTimestampMins(newTableSetTimestampMap)); + backupManager.writeBackupStartCode(newStartCode); + + // backup complete + completeBackup(conn, backupInfo, backupManager, BackupType.FULL, conf); + } catch (Exception e) { + failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ", + BackupType.FULL, conf); + throw new IOException(e); + } + + } + +}