http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTable.java new file mode 100644 index 0000000..14769f9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTable.java @@ -0,0 +1,642 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupHandler.BACKUPSTATUS; +import org.apache.hadoop.hbase.backup.BackupUtil.BackupCompleteData; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; + +/** + * This class provides 'hbase:backup' table API + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class BackupSystemTable { + + private static final Log LOG = LogFactory.getLog(BackupSystemTable.class); + private final static String TABLE_NAMESPACE = "hbase"; + private final static String TABLE_NAME = "backup"; + private final static TableName tableName = TableName.valueOf(TABLE_NAMESPACE, TABLE_NAME); + public final static byte[] familyName = "f".getBytes(); + + // Connection to HBase cluster + private static Connection connection; + // Cluster configuration + private static Configuration config; + // singleton + private static BackupSystemTable table; + + /** + * Get instance by a given configuration + * @param conf - HBase configuration + * @return instance of BackupSystemTable + * @throws IOException exception + */ + public synchronized static BackupSystemTable getTable(Configuration conf) throws IOException { + if (connection == null) { + connection = ConnectionFactory.createConnection(conf); + config = conf; + // Verify hbase:system exists + createSystemTableIfNotExists(); + table = new BackupSystemTable(); + } + return table; + } + + /** + * TODO: refactor + * @throws IOException exception + */ + public static void close() throws IOException { + connection.close(); + table = null; + } + + /** + * Gets table name + * @return table name + */ + public static TableName getTableName() { + return tableName; + } + + private static void createSystemTableIfNotExists() throws IOException { + Admin admin = null; + try { + admin = connection.getAdmin(); + if (admin.tableExists(tableName) == false) { + HTableDescriptor tableDesc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(familyName); + colDesc.setMaxVersions(1); + int ttl = + config.getInt(HConstants.BACKUP_SYSTEM_TTL_KEY, HConstants.BACKUP_SYSTEM_TTL_DEFAULT); + colDesc.setTimeToLive(ttl); + tableDesc.addFamily(colDesc); + admin.createTable(tableDesc); + } + } catch (IOException e) { + LOG.error(e); + throw e; + } finally { + if (admin != null) { + admin.close(); + } + } + } + + private BackupSystemTable() { + } + + /** + * Updates status (state) of a backup session in hbase:backup table + * @param context context + * @throws IOException exception + */ + public void updateBackupStatus(BackupContext context) throws IOException { + + if (LOG.isDebugEnabled()) { + LOG.debug("update backup status in hbase:backup for: " + context.getBackupId() + + " set status=" + context.getFlag()); + } + Table table = null; + try { + table = connection.getTable(tableName); + Put put = BackupSystemTableHelper.createPutForBackupContext(context); + table.put(put); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Deletes backup status from hbase:backup table + * @param backupId backup id + * @throws IOException exception + */ + + public void deleteBackupStatus(String backupId) throws IOException { + + if (LOG.isDebugEnabled()) { + LOG.debug("delete backup status in hbase:backup for " + backupId); + } + Table table = null; + try { + table = connection.getTable(tableName); + Delete del = BackupSystemTableHelper.createDeletForBackupContext(backupId); + table.delete(del); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Reads backup status object (instance of BackupContext) from hbase:backup table + * @param backupId - backupId + * @return Current status of backup session or null + */ + + public BackupContext readBackupStatus(String backupId) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("read backup status from hbase:backup for: " + backupId); + } + + Table table = null; + try { + table = connection.getTable(tableName); + Get get = BackupSystemTableHelper.createGetForBackupContext(backupId); + Result res = table.get(get); + if(res.isEmpty()){ + return null; + } + return BackupSystemTableHelper.resultToBackupContext(res); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Read the last backup start code (timestamp) of last successful backup. Will return null if + * there is no start code stored on hbase or the value is of length 0. These two cases indicate + * there is no successful backup completed so far. + * @return the timestamp of last successful backup + * @throws IOException exception + */ + public String readBackupStartCode() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("read backup start code from hbase:backup"); + } + Table table = null; + try { + table = connection.getTable(tableName); + Get get = BackupSystemTableHelper.createGetForStartCode(); + Result res = table.get(get); + if (res.isEmpty()){ + return null; + } + Cell cell = res.listCells().get(0); + byte[] val = CellUtil.cloneValue(cell); + if (val.length == 0){ + return null; + } + return new String(val); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Write the start code (timestamp) to hbase:backup. If passed in null, then write 0 byte. + * @param startCode start code + * @throws IOException exception + */ + public void writeBackupStartCode(String startCode) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("write backup start code to hbase:backup " + startCode); + } + Table table = null; + try { + table = connection.getTable(tableName); + Put put = BackupSystemTableHelper.createPutForStartCode(startCode); + table.put(put); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Get the Region Servers log information after the last log roll from hbase:backup. + * @return RS log info + * @throws IOException exception + */ + public HashMap<String, String> readRegionServerLastLogRollResult() + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("read region server last roll log result to hbase:backup"); + } + Table table = null; + ResultScanner scanner = null; + + try { + table = connection.getTable(tableName); + Scan scan = BackupSystemTableHelper.createScanForReadRegionServerLastLogRollResult(); + scan.setMaxVersions(1); + scanner = table.getScanner(scan); + Result res = null; + HashMap<String, String> rsTimestampMap = new HashMap<String, String>(); + while ((res = scanner.next()) != null) { + res.advance(); + Cell cell = res.current(); + byte[] row = CellUtil.cloneRow(cell); + String server = + BackupSystemTableHelper.getServerNameForReadRegionServerLastLogRollResult(row); + + byte[] data = CellUtil.cloneValue(cell); + rsTimestampMap.put(server, new String(data)); + } + return rsTimestampMap; + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + table.close(); + } + } + } + + /** + * Writes Region Server last roll log result (timestamp) to hbase:backup table + * @param server - Region Server name + * @param fileName - last log timestamp + * @throws IOException exception + */ + public void writeRegionServerLastLogRollResult(String server, String fileName) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("write region server last roll log result to hbase:backup"); + } + Table table = null; + try { + table = connection.getTable(tableName); + Put put = + BackupSystemTableHelper.createPutForRegionServerLastLogRollResult(server, fileName); + table.put(put); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Get all completed backup information (in desc order by time) + * @return history info of BackupCompleteData + * @throws IOException exception + */ + public ArrayList<BackupCompleteData> getBackupHistory() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("get backup history from hbase:backup"); + } + Table table = null; + ResultScanner scanner = null; + ArrayList<BackupCompleteData> list = new ArrayList<BackupCompleteData>(); + try { + table = connection.getTable(tableName); + Scan scan = BackupSystemTableHelper.createScanForBackupHistory(); + scan.setMaxVersions(1); + scanner = table.getScanner(scan); + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + BackupContext context = BackupSystemTableHelper.cellToBackupContext(res.current()); + if (context.getFlag() != BACKUPSTATUS.COMPLETE) { + continue; + } + + BackupCompleteData history = new BackupCompleteData(); + history.setBackupToken(context.getBackupId()); + history.setStartTime(Long.toString(context.getStartTs())); + history.setEndTime(Long.toString(context.getEndTs())); + history.setBackupRootPath(context.getTargetRootDir()); + history.setTableList(context.getTableListAsString()); + history.setType(context.getType()); + history.setBytesCopied(Long.toString(context.getTotalBytesCopied())); + + if (context.fromExistingSnapshot()) { + history.markFromExistingSnapshot(); + } + list.add(history); + } + return BackupUtil.sortHistoryListDesc(list); + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + table.close(); + } + } + } + + /** + * Get all backup session with a given status (in desc order by time) + * @param status status + * @return history info of backup contexts + * @throws IOException exception + */ + public ArrayList<BackupContext> getBackupContexts(BACKUPSTATUS status) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("get backup contexts from hbase:backup"); + } + Table table = null; + ResultScanner scanner = null; + ArrayList<BackupContext> list = new ArrayList<BackupContext>(); + try { + table = connection.getTable(tableName); + Scan scan = BackupSystemTableHelper.createScanForBackupHistory(); + scan.setMaxVersions(1); + scanner = table.getScanner(scan); + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + BackupContext context = BackupSystemTableHelper.cellToBackupContext(res.current()); + if (context.getFlag() != status){ + continue; + } + list.add(context); + } + return list; + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + table.close(); + } + } + } + + /** + * Write the current timestamps for each regionserver to hbase:backup after a successful full or + * incremental backup. The saved timestamp is of the last log file that was backed up already. + * @param tables tables + * @param newTimestamps timestamps + * @throws IOException exception + */ + public void writeRegionServerLogTimestamp(Set<String> tables, + HashMap<String, String> newTimestamps) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("write RS log ts to HBASE_BACKUP"); + } + StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, String> entry : newTimestamps.entrySet()) { + String host = entry.getKey(); + String timestamp = entry.getValue(); + sb.append(host).append(BackupUtil.FIELD_SEPARATOR).append(timestamp) + .append(BackupUtil.RECORD_SEPARATOR); + } + String smap = sb.toString(); + List<Put> puts = new ArrayList<Put>(); + for (String table : tables) { + Put put = BackupSystemTableHelper.createPutForWriteRegionServerLogTimestamp(table, smap); + puts.add(put); + } + Table table = null; + try { + table = connection.getTable(tableName); + table.put(puts); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Read the timestamp for each region server log after the last successful backup. Each table has + * its own set of the timestamps. The info is stored for each table as a concatenated string of + * rs->timestapmp + * @return the timestamp for each region server. key: tableName value: + * RegionServer,PreviousTimeStamp + * @throws IOException exception + */ + public HashMap<String, HashMap<String, String>> readLogTimestampMap() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("read RS log ts from HBASE_BACKUP"); + } + + Table table = null; + ResultScanner scanner = null; + HashMap<String, HashMap<String, String>> tableTimestampMap = + new HashMap<String, HashMap<String, String>>(); + + try { + table = connection.getTable(tableName); + Scan scan = BackupSystemTableHelper.createScanForReadLogTimestampMap(); + scanner = table.getScanner(scan); + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + Cell cell = res.current(); + byte[] row = CellUtil.cloneRow(cell); + String tabName = BackupSystemTableHelper.getTableNameForReadLogTimestampMap(row); + HashMap<String, String> lastBackup = new HashMap<String, String>(); + byte[] data = CellUtil.cloneValue(cell); + if (data == null) { + // TODO + throw new IOException("Data of last backup data from HBASE_BACKUP " + + "is empty. Create a backup first."); + } + if (data != null && data.length > 0) { + String s = new String(data); + String[] records = s.split(BackupUtil.RECORD_SEPARATOR); + for (String record : records) { + String[] flds = record.split(BackupUtil.FIELD_SEPARATOR); + if (flds.length != 2) { + throw new IOException("data from HBASE_BACKUP is corrupted: " + + Arrays.toString(flds)); + } + lastBackup.put(flds[0], flds[1]); + } + tableTimestampMap.put(tabName, lastBackup); + } + } + return tableTimestampMap; + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + table.close(); + } + } + } + + /** + * Return the current tables covered by incremental backup. + * @return set of tableNames + * @throws IOException exception + */ + public Set<String> getIncrementalBackupTableSet() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("get incr backup table set from hbase:backup"); + } + Table table = null; + TreeSet<String> set = new TreeSet<String>(); + + try { + table = connection.getTable(tableName); + Get get = BackupSystemTableHelper.createGetForIncrBackupTableSet(); + Result res = table.get(get); + if (res.isEmpty()) { + return set; + } + List<Cell> cells = res.listCells(); + for (Cell cell : cells) { + // qualifier = table name - we use table names as qualifiers + // TODO ns:table as qualifier? + set.add(new String(CellUtil.cloneQualifier(cell))); + } + return set; + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Add tables to global incremental backup set + * @param tables - set of tables + * @throws IOException exception + */ + public void addIncrementalBackupTableSet(Set<String> tables) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("add incr backup table set to hbase:backup"); + } + Table table = null; + try { + table = connection.getTable(tableName); + Put put = BackupSystemTableHelper.createPutForIncrBackupTableSet(tables); + table.put(put); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Register WAL files as eligible for deletion + * @param files files + * @throws IOException exception + */ + public void addWALFiles(List<String> files, String backupId) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("add WAL files to hbase:backup"); + } + Table table = null; + try { + table = connection.getTable(tableName); + List<Put> puts = BackupSystemTableHelper.createPutsForAddWALFiles(files, backupId); + table.put(puts); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Check if WAL file is eligible for deletion + * @param file file + * @return true, if - yes. + * @throws IOException exception + */ + public boolean checkWALFile(String file) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Check if WAL file has been already backuped in hbase:backup"); + } + Table table = null; + try { + table = connection.getTable(tableName); + Get get = BackupSystemTableHelper.createGetForCheckWALFile(file); + Result res = table.get(get); + if (res.isEmpty()){ + return false; + } + return true; + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Checks if we have at least one backup session in hbase:backup This API is used by + * BackupLogCleaner + * @return true, if - at least one session exists in hbase:backup table + * @throws IOException exception + */ + public boolean hasBackupSessions() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("has backup sessions from hbase:backup"); + } + Table table = null; + ResultScanner scanner = null; + boolean result = false; + try { + table = connection.getTable(tableName); + Scan scan = BackupSystemTableHelper.createScanForBackupHistory(); + scan.setMaxVersions(1); + scan.setCaching(1); + scanner = table.getScanner(scan); + if (scanner.next() != null) { + result = true; + } + return result; + } finally { + if (scanner != null) { + scanner.close(); + } + if (table != null) { + table.close(); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTableHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTableHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTableHelper.java new file mode 100644 index 0000000..bf62a84 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupSystemTableHelper.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; + + +/** + * A collection for methods used by BackupSystemTable. + */ + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class BackupSystemTableHelper { + + /** + * hbase:backup schema: + * 1. Backup sessions rowkey= "session." + backupId; value = serialized + * BackupContext + * 2. Backup start code rowkey = "startcode"; value = startcode + * 3. Incremental backup set rowkey="incrbackupset"; value=[list of tables] + * 4. Table-RS-timestamp map rowkey="trslm"+ table_name; value = map[RS-> last WAL timestamp] + * 5. RS - WAL ts map rowkey="rslogts."+server; value = last WAL timestamp + * 6. WALs recorded rowkey="wals."+WAL unique file name; value = NULL (value is not used) + */ + private static final Log LOG = LogFactory.getLog(BackupSystemTableHelper.class); + + private final static String BACKUP_CONTEXT_PREFIX = "session."; + private final static String START_CODE_ROW = "startcode"; + private final static String INCR_BACKUP_SET = "incrbackupset"; + private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm."; + private final static String RS_LOG_TS_PREFIX = "rslogts."; + private final static String WALS_PREFIX = "wals."; + + private final static byte[] q0 = "0".getBytes(); + private final static byte[] EMPTY_VALUE = new byte[] {}; + + private BackupSystemTableHelper() { + throw new AssertionError("Instantiating utility class..."); + } + + /** + * Creates Put operation for a given backup context object + * @param context backup context + * @return put operation + * @throws IOException exception + */ + static Put createPutForBackupContext(BackupContext context) throws IOException { + + Put put = new Put((BACKUP_CONTEXT_PREFIX + context.getBackupId()).getBytes()); + put.addColumn(BackupSystemTable.familyName, q0, context.toByteArray()); + return put; + } + + /** + * Creates Get operation for a given backup id + * @param backupId - backup's ID + * @return get operation + * @throws IOException exception + */ + static Get createGetForBackupContext(String backupId) throws IOException { + Get get = new Get((BACKUP_CONTEXT_PREFIX + backupId).getBytes()); + get.addFamily(BackupSystemTable.familyName); + get.setMaxVersions(1); + return get; + } + + /** + * Creates Delete operation for a given backup id + * @param backupId - backup's ID + * @return delete operation + * @throws IOException exception + */ + public static Delete createDeletForBackupContext(String backupId) { + Delete del = new Delete((BACKUP_CONTEXT_PREFIX + backupId).getBytes()); + del.addFamily(BackupSystemTable.familyName); + return del; + } + + /** + * Converts Result to BackupContext + * @param res - HBase result + * @return backup context instance + * @throws IOException exception + */ + static BackupContext resultToBackupContext(Result res) throws IOException { + res.advance(); + Cell cell = res.current(); + return cellToBackupContext(cell); + } + + /** + * Creates Get operation to retrieve start code from hbase:backup + * @return get operation + * @throws IOException exception + */ + static Get createGetForStartCode() throws IOException { + Get get = new Get(START_CODE_ROW.getBytes()); + get.addFamily(BackupSystemTable.familyName); + get.setMaxVersions(1); + return get; + } + + /** + * Creates Put operation to store start code to hbase:backup + * @return put operation + * @throws IOException exception + */ + static Put createPutForStartCode(String startCode) { + Put put = new Put(START_CODE_ROW.getBytes()); + put.addColumn(BackupSystemTable.familyName, q0, startCode.getBytes()); + return put; + } + + /** + * Creates Get to retrieve incremental backup table set from hbase:backup + * @return get operation + * @throws IOException exception + */ + static Get createGetForIncrBackupTableSet() throws IOException { + Get get = new Get(INCR_BACKUP_SET.getBytes()); + get.addFamily(BackupSystemTable.familyName); + get.setMaxVersions(1); + return get; + } + + /** + * Creates Put to store incremental backup table set + * @param tables tables + * @return put operation + */ + static Put createPutForIncrBackupTableSet(Set<String> tables) { + Put put = new Put(INCR_BACKUP_SET.getBytes()); + for (String table : tables) { + put.addColumn(BackupSystemTable.familyName, table.getBytes(), EMPTY_VALUE); + } + return put; + } + + /** + * Creates Scan operation to load backup history + * @return scan operation + */ + static Scan createScanForBackupHistory() { + Scan scan = new Scan(); + byte[] startRow = BACKUP_CONTEXT_PREFIX.getBytes(); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.familyName); + + return scan; + } + + /** + * Converts cell to backup context instance. + * @param current - cell + * @return backup context instance + * @throws IOException exception + */ + static BackupContext cellToBackupContext(Cell current) throws IOException { + byte[] data = CellUtil.cloneValue(current); + try { + BackupContext ctxt = BackupContext.fromByteArray(data); + return ctxt; + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + /** + * Creates Put to write RS last roll log timestamp map + * @param table - table + * @param smap - map, containing RS:ts + * @return put operation + */ + static Put createPutForWriteRegionServerLogTimestamp(String table, String smap) { + Put put = new Put((TABLE_RS_LOG_MAP_PREFIX + table).getBytes()); + put.addColumn(BackupSystemTable.familyName, q0, smap.getBytes()); + return put; + } + + /** + * Creates Scan to load table-> { RS -> ts} map of maps + * @return scan operation + */ + static Scan createScanForReadLogTimestampMap() { + Scan scan = new Scan(); + byte[] startRow = TABLE_RS_LOG_MAP_PREFIX.getBytes(); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.familyName); + + return scan; + } + + /** + * Get table name from rowkey + * @param cloneRow rowkey + * @return table name + */ + static String getTableNameForReadLogTimestampMap(byte[] cloneRow) { + int prefixSize = TABLE_RS_LOG_MAP_PREFIX.length(); + return new String(cloneRow, prefixSize, cloneRow.length - prefixSize); + } + + /** + * Creates Put to store RS last log result + * @param server - server name + * @param fileName - log roll result (timestamp) + * @return put operation + */ + static Put createPutForRegionServerLastLogRollResult(String server, String fileName) { + Put put = new Put((RS_LOG_TS_PREFIX + server).getBytes()); + put.addColumn(BackupSystemTable.familyName, q0, fileName.getBytes()); + return put; + } + + /** + * Creates Scan operation to load last RS log roll results + * @return scan operation + */ + static Scan createScanForReadRegionServerLastLogRollResult() { + Scan scan = new Scan(); + byte[] startRow = RS_LOG_TS_PREFIX.getBytes(); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.familyName); + + return scan; + } + + /** + * Get server's name from rowkey + * @param row - rowkey + * @return server's name + */ + static String getServerNameForReadRegionServerLastLogRollResult(byte[] row) { + int prefixSize = RS_LOG_TS_PREFIX.length(); + return new String(row, prefixSize, row.length - prefixSize); + } + + /** + * Creates put list for list of WAL files + * @param files list of WAL file paths + * @param backupId backup id + * @return put list + * @throws IOException exception + */ + public static List<Put> createPutsForAddWALFiles(List<String> files, String backupId) + throws IOException { + + List<Put> puts = new ArrayList<Put>(); + for (String file : files) { + LOG.debug("+++ put: " + BackupUtil.getUniqueWALFileNamePart(file)); + byte[] row = (WALS_PREFIX + BackupUtil.getUniqueWALFileNamePart(file)).getBytes(); + Put put = new Put(row); + put.addColumn(BackupSystemTable.familyName, q0, backupId.getBytes()); + puts.add(put); + } + return puts; + } + + /** + * Creates Get operation for a given wal file name + * @param file file + * @return get operation + * @throws IOException exception + */ + public static Get createGetForCheckWALFile(String file) throws IOException { + byte[] row = (WALS_PREFIX + BackupUtil.getUniqueWALFileNamePart(file)).getBytes(); + Get get = new Get(row); + get.addFamily(BackupSystemTable.familyName); + get.setMaxVersions(1); + return get; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupUtil.java new file mode 100644 index 0000000..ff8bd2e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupUtil.java @@ -0,0 +1,564 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; + +/** + * A collection for methods used by multiple classes to backup HBase tables. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class BackupUtil { + protected static final Log LOG = LogFactory.getLog(BackupUtil.class); + + public static final String FIELD_SEPARATOR = "\001"; + public static final String RECORD_SEPARATOR = "\002"; + public static final String LOGNAME_SEPARATOR = "."; + protected static final String HDFS = "hdfs://"; + protected static Configuration conf = null; + + private BackupUtil(){ + throw new AssertionError("Instantiating utility class..."); + } + + /** + * Set the configuration from a given one. + * @param newConf A new given configuration + */ + public synchronized static void setConf(Configuration newConf) { + conf = newConf; + } + + /** + * Get and merge Hadoop and HBase configuration. + * @throws IOException exception + */ + protected static Configuration getConf() { + if (conf == null) { + conf = new Configuration(); + HBaseConfiguration.merge(conf, HBaseConfiguration.create()); + } + return conf; + } + + /** + * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp value + * for the RS among the tables. + * @param rsLogTimestampMap timestamp map + * @return the min timestamp of each RS + */ + protected static HashMap<String, String> getRSLogTimestampMins( + HashMap<String, HashMap<String, String>> rsLogTimestampMap) { + + if (rsLogTimestampMap == null || rsLogTimestampMap.isEmpty()) { + return null; + } + + HashMap<String, String> rsLogTimestamptMins = new HashMap<String, String>(); + HashMap<String, HashMap<String, String>> rsLogTimestampMapByRS = + new HashMap<String, HashMap<String, String>>(); + + for (Entry<String, HashMap<String, String>> tableEntry : rsLogTimestampMap.entrySet()) { + String table = tableEntry.getKey(); + HashMap<String, String> rsLogTimestamp = tableEntry.getValue(); + for (Entry<String, String> rsEntry : rsLogTimestamp.entrySet()) { + String rs = rsEntry.getKey(); + String ts = rsEntry.getValue(); + if (!rsLogTimestampMapByRS.containsKey(rs)) { + rsLogTimestampMapByRS.put(rs, new HashMap<String, String>()); + rsLogTimestampMapByRS.get(rs).put(table, ts); + } else { + rsLogTimestampMapByRS.get(rs).put(table, ts); + } + } + } + + for (String rs : rsLogTimestampMapByRS.keySet()) { + rsLogTimestamptMins.put(rs, getMinValue(rsLogTimestampMapByRS.get(rs))); + } + + return rsLogTimestamptMins; + } + + /** + * Get the min value for all the Values a map. + * @param map map + * @return the min value + */ + protected static String getMinValue(HashMap<String, String> map) { + String minTimestamp = null; + if (map != null) { + ArrayList<String> timestampList = new ArrayList<String>(map.values()); + Collections.sort(timestampList, new Comparator<String>() { + @Override + public int compare(String s1, String s2) { + long l1 = Long.valueOf(s1); + long l2 = Long.valueOf(s2); + if (l1 > l2) { + return 1; + } else if (l1 < l2) { + return -1; + } else { + return 0; + } + } + }); + // The min among all the RS log timestamps will be kept in ZK. + minTimestamp = timestampList.get(0); + } + return minTimestamp; + } + + /** + * copy out Table RegionInfo into incremental backup image need to consider move this logic into + * HBackupFileSystem + * @param backupContext backup context + * @param conf configuration + * @throws IOException exception + * @throws InterruptedException exception + */ + protected static void copyTableRegionInfo(BackupContext backupContext, Configuration conf) + throws IOException, InterruptedException { + + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + // for each table in the table set, copy out the table info and region info files in the correct + // directory structure + for (String table : backupContext.getTables()) { + + LOG.debug("Attempting to copy table info for:" + table); + TableDescriptor orig = + FSTableDescriptors.getTableDescriptorFromFs(fs, rootDir, TableName.valueOf(table)); + + // write a copy of descriptor to the target directory + Path target = new Path(backupContext.getBackupStatus(table).getTargetDir()); + FileSystem targetFs = target.getFileSystem(conf); + FSTableDescriptors descriptors = + new FSTableDescriptors(conf, targetFs, FSUtils.getRootDir(conf)); + descriptors.createTableDescriptorForTableDirectory(target, orig, false); + LOG.debug("Finished copying tableinfo."); + + HBaseAdmin hbadmin = null; + // TODO: optimize + Connection conn = null; + List<HRegionInfo> regions = null; + try { + conn = ConnectionFactory.createConnection(conf); + hbadmin = (HBaseAdmin) conn.getAdmin(); + regions = hbadmin.getTableRegions(TableName.valueOf(table)); + } catch (Exception e) { + throw new BackupException(e); + } finally { + if (hbadmin != null) { + hbadmin.close(); + } + if(conn != null){ + conn.close(); + } + } + + // For each region, write the region info to disk + LOG.debug("Starting to write region info for table " + table); + for (HRegionInfo regionInfo : regions) { + Path regionDir = + HRegion.getRegionDir(new Path(backupContext.getBackupStatus(table).getTargetDir()), + regionInfo); + regionDir = + new Path(backupContext.getBackupStatus(table).getTargetDir(), regionDir.getName()); + writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo); + } + LOG.debug("Finished writing region info for table " + table); + } + } + + /** + * Write the .regioninfo file on-disk. + */ + public static void writeRegioninfoOnFilesystem(final Configuration conf, final FileSystem fs, + final Path regionInfoDir, HRegionInfo regionInfo) throws IOException { + final byte[] content = regionInfo.toDelimitedByteArray(); + Path regionInfoFile = new Path(regionInfoDir, ".regioninfo"); + // First check to get the permissions + FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); + // Write the RegionInfo file content + FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null); + try { + out.write(content); + } finally { + out.close(); + } + } + + /** + * TODO: verify the code + * @param p path + * @return host name + * @throws IOException exception + */ + protected static String parseHostFromOldLog(Path p) throws IOException { + String n = p.getName(); + int idx = n.lastIndexOf(LOGNAME_SEPARATOR); + String s = URLDecoder.decode(n.substring(0, idx), "UTF8"); + return ServerName.parseHostname(s); + } + + public static String parseHostNameFromLogFile(Path p) throws IOException { + if (isArchivedLogFile(p)) { + return parseHostFromOldLog(p); + } else { + return DefaultWALProvider.getServerNameFromWALDirectoryName(p).getHostname(); + } + } + + private static boolean isArchivedLogFile(Path p) { + String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR; + return p.toString().contains(oldLog); + } + + /** + * Return WAL file name + * @param walFileName WAL file name + * @return WAL file name + * @throws IOException exception + * @throws IllegalArgumentException exception + */ + public static String getUniqueWALFileNamePart(String walFileName) throws IOException { + return new Path(walFileName).getName(); + } + + /** + * Return WAL file name + * @param p - WAL file path + * @return WAL file name + * @throws IOException exception + */ + public static String getUniqueWALFileNamePart(Path p) throws IOException { + return p.getName(); + } + + /** + * Given the log file, parse the timestamp from the file name. The timestamp is the last number. + * @param p a path to the log file + * @return the timestamp + * @throws IOException exception + */ + protected static String getCreationTime(Path p, Configuration conf) throws IOException { + int idx = p.getName().lastIndexOf(LOGNAME_SEPARATOR); + if (idx < 0) { + throw new IOException("Cannot parse timestamp from path " + p); + } + String ts = p.getName().substring(idx + 1); + return ts; + } + + /** + * Get the total length of files under the given directory recursively. + * @param fs The hadoop file system + * @param dir The target directory + * @return the total length of files + * @throws IOException exception + */ + public static long getFilesLength(FileSystem fs, Path dir) throws IOException { + long totalLength = 0; + FileStatus[] files = FSUtils.listStatus(fs, dir); + if (files != null) { + for (FileStatus fileStatus : files) { + if (fileStatus.isDir()) { + totalLength += getFilesLength(fs, fileStatus.getPath()); + } else { + totalLength += fileStatus.getLen(); + } + } + } + return totalLength; + } + + /** + * Keep the record for dependency for incremental backup and history info p.s, we may be able to + * merge this class into backupImage class later + */ + public static class BackupCompleteData implements Comparable<BackupCompleteData> { + private String startTime; + private String endTime; + private String type; + private String backupRootPath; + private String tableList; + private String backupToken; + private String bytesCopied; + private List<String> ancestors; + private boolean fromExistingSnapshot = false; + + public List<String> getAncestors() { + if (fromExistingSnapshot) { + return null; + } + if (this.ancestors == null) { + this.ancestors = new ArrayList<String>(); + } + return this.ancestors; + } + + public void addAncestor(String backupToken) { + this.getAncestors().add(backupToken); + } + + public String getBytesCopied() { + return bytesCopied; + } + + public void setBytesCopied(String bytesCopied) { + this.bytesCopied = bytesCopied; + } + + public String getBackupToken() { + return backupToken; + } + + public void setBackupToken(String backupToken) { + this.backupToken = backupToken; + } + + public String getStartTime() { + return startTime; + } + + public void setStartTime(String startTime) { + this.startTime = startTime; + } + + public String getEndTime() { + return endTime; + } + + public void setEndTime(String endTime) { + this.endTime = endTime; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getBackupRootPath() { + return backupRootPath; + } + + public void setBackupRootPath(String backupRootPath) { + this.backupRootPath = backupRootPath; + } + + public String getTableList() { + return tableList; + } + + public void setTableList(String tableList) { + this.tableList = tableList; + } + + public boolean fromExistingSnapshot() { + return this.fromExistingSnapshot; + } + + public void markFromExistingSnapshot() { + this.fromExistingSnapshot = true; + } + + @Override + public int compareTo(BackupCompleteData o) { + Long thisTS = + new Long(this.getBackupToken().substring(this.getBackupToken().lastIndexOf("_") + 1)); + Long otherTS = + new Long(o.getBackupToken().substring(o.getBackupToken().lastIndexOf("_") + 1)); + return thisTS.compareTo(otherTS); + } + + } + + /** + * Sort history list by start time in descending order. + * @param historyList history list + * @return sorted list of BackupCompleteData + */ + public static ArrayList<BackupCompleteData> sortHistoryListDesc( + ArrayList<BackupCompleteData> historyList) { + ArrayList<BackupCompleteData> list = new ArrayList<BackupCompleteData>(); + TreeMap<String, BackupCompleteData> map = new TreeMap<String, BackupCompleteData>(); + for (BackupCompleteData h : historyList) { + map.put(h.getStartTime(), h); + } + Iterator<String> i = map.descendingKeySet().iterator(); + while (i.hasNext()) { + list.add(map.get(i.next())); + } + return list; + } + + /** + * Get list of all WAL files (WALs and archive) + * @param c - configuration + * @return list of WAL files + * @throws IOException exception + */ + public static List<String> getListOfWALFiles(Configuration c) throws IOException { + Path rootDir = FSUtils.getRootDir(c); + Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + List<String> logFiles = new ArrayList<String>(); + + FileSystem fs = FileSystem.get(c); + logFiles = getFiles(fs, logDir, logFiles, null); + logFiles = getFiles(fs, oldLogDir, logFiles, null); + return logFiles; + } + + /** + * Get list of all WAL files (WALs and archive) + * @param c - configuration + * @return list of WAL files + * @throws IOException exception + */ + public static List<String> getListOfWALFiles(Configuration c, PathFilter filter) + throws IOException { + Path rootDir = FSUtils.getRootDir(c); + Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + List<String> logFiles = new ArrayList<String>(); + + FileSystem fs = FileSystem.get(c); + logFiles = getFiles(fs, logDir, logFiles, filter); + logFiles = getFiles(fs, oldLogDir, logFiles, filter); + return logFiles; + } + + /** + * Get list of all old WAL files (WALs and archive) + * @param c - configuration + * @return list of WAL files + * @throws IOException exception + */ + public static List<String> getWALFilesOlderThan(final Configuration c, + final HashMap<String, String> hostTimestampMap) throws IOException { + Path rootDir = FSUtils.getRootDir(c); + Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + List<String> logFiles = new ArrayList<String>(); + + PathFilter filter = new PathFilter() { + + @Override + public boolean accept(Path p) { + try { + if (DefaultWALProvider.isMetaFile(p)) { + return false; + } + String host = BackupUtil.parseHostNameFromLogFile(p); + String oldTimestamp = hostTimestampMap.get(host); + String currentLogTS = getCreationTime(p, c); + if (LOG.isDebugEnabled()) { + LOG.debug("path=" + p); + LOG.debug("oldTimestamp=" + oldTimestamp); + LOG.debug("currentLogTS=" + currentLogTS); + } + return Long.parseLong(currentLogTS) <= Long.parseLong(oldTimestamp); + } catch (IOException e) { + LOG.error(e); + return false; + } + } + }; + FileSystem fs = FileSystem.get(c); + logFiles = getFiles(fs, logDir, logFiles, filter); + logFiles = getFiles(fs, oldLogDir, logFiles, filter); + return logFiles; + } + + private static List<String> getFiles(FileSystem fs, Path rootDir, List<String> files, + PathFilter filter) throws FileNotFoundException, IOException { + + RemoteIterator<LocatedFileStatus> it = fs.listFiles(rootDir, true); + + while (it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if (lfs.isDirectory()) { + continue; + } + // apply filter + if (filter.accept(lfs.getPath())) { + files.add(lfs.getPath().toString()); + LOG.info(lfs.getPath()); + } + } + return files; + } + + public static String concat(Collection<String> col, String separator) { + if (col.size() == 0) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (String s : col) { + sb.append(s + separator); + } + sb.deleteCharAt(sb.lastIndexOf(";")); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java new file mode 100644 index 0000000..74411da --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java @@ -0,0 +1,511 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * View to an on-disk Backup Image FileSytem + * Provides the set of methods necessary to interact with the on-disk Backup Image data. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class HBackupFileSystem { + public static final Log LOG = LogFactory.getLog(HBackupFileSystem.class); + + private final String RESTORE_TMP_PATH = "/tmp/restoreTemp"; + private final String[] ignoreDirs = { "recovered.edits" }; + + private final Configuration conf; + private final FileSystem fs; + private final Path backupRootPath; + private final String backupId; + + /** + * Create a view to the on-disk Backup Image. + * @param conf to use + * @param backupPath to where the backup Image stored + * @param backupId represent backup Image + */ + HBackupFileSystem(final Configuration conf, final Path backupRootPath, final String backupId) + throws IOException { + this.conf = conf; + this.fs = backupRootPath.getFileSystem(conf); + this.backupRootPath = backupRootPath; + this.backupId = backupId; // the backup ID for the lead backup Image + } + + /** + * @param tableName is the table backuped + * @return {@link HTableDescriptor} saved in backup image of the table + */ + protected HTableDescriptor getTableDesc(String tableName) throws FileNotFoundException, + IOException { + + Path tableInfoPath = this.getTableInfoPath(tableName); + LOG.debug("tableInfoPath = " + tableInfoPath.toString()); + SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, tableInfoPath); + LOG.debug("desc = " + desc.getName()); + SnapshotManifest manifest = SnapshotManifest.open(conf, fs, tableInfoPath, desc); + HTableDescriptor tableDescriptor = manifest.getTableDescriptor(); + /* + * for HBase 0.96 or 0.98 HTableDescriptor tableDescriptor = + * FSTableDescriptors.getTableDescriptorFromFs(fs, tableInfoPath); + */ + if (!tableDescriptor.getNameAsString().equals(tableName)) { + LOG.error("couldn't find Table Desc for table: " + tableName + " under tableInfoPath: " + + tableInfoPath.toString()); + LOG.error("tableDescriptor.getNameAsString() = " + tableDescriptor.getNameAsString()); + } + return tableDescriptor; + } + + /** + * Given the backup root dir, backup id and the table name, return the backup image location, + * which is also where the backup manifest file is. return value look like: + * "hdfs://backup.hbase.org:9000/user/biadmin/backup1/default/t1_dn/backup_1396650096738" + * @param backupRootDir backup root directory + * @param backupId backup id + * @param table table name + * @return backupPath String for the particular table + */ + protected static String getTableBackupDir(String backupRootDir, String backupId, String table) { + TableName tableName = TableName.valueOf(table); + return backupRootDir + File.separator + tableName.getNamespaceAsString() + File.separator + + tableName.getQualifierAsString() + File.separator + backupId; + } + + /** + * Given the backup root dir, backup id and the table name, return the backup image location, + * which is also where the backup manifest file is. return value look like: + * "hdfs://backup.hbase.org:9000/user/biadmin/backup1/default/t1_dn/backup_1396650096738" + * @param tableN table name + * @return backupPath for the particular table + */ + protected Path getTableBackupPath(String tableN) { + TableName tableName = TableName.valueOf(tableN); + return new Path(this.backupRootPath, tableName.getNamespaceAsString() + File.separator + + tableName.getQualifierAsString() + File.separator + backupId); + } + + /** + * return value represent path for: + * ".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/.hbase-snapshot" + * @param tableName table name + * @return path for snapshot + */ + protected Path getTableSnapshotPath(String tableName) { + return new Path(this.getTableBackupPath(tableName), HConstants.SNAPSHOT_DIR_NAME); + } + + /** + * return value represent path for: + * "..../default/t1_dn/backup_1396650096738/.hbase-snapshot/snapshot_1396650097621_default_t1_dn" + * this path contains .snapshotinfo, .tabledesc (0.96 and 0.98) this path contains .snapshotinfo, + * .data.manifest (trunk) + * @param tableName table name + * @return path to table info + * @throws FileNotFoundException exception + * @throws IOException exception + */ + protected Path getTableInfoPath(String tableName) throws FileNotFoundException, IOException { + + Path tableSnapShotPath = this.getTableSnapshotPath(tableName); + Path tableInfoPath = null; + + // can't build the path directly as the timestamp values are different + FileStatus[] snapshots = fs.listStatus(tableSnapShotPath); + for (FileStatus snapshot : snapshots) { + tableInfoPath = snapshot.getPath(); + // SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest"; + if (tableInfoPath.getName().endsWith("data.manifest")) { + LOG.debug("find Snapshot Manifest"); + break; + } + } + return tableInfoPath; + } + + /** + * return value represent path for: + * ".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/archive/data/default/t1_dn" + * @param tabelName table name + * @return path to table archive + * @throws IOException exception + */ + protected Path getTableArchivePath(String tableName) throws IOException { + Path baseDir = new Path(getTableBackupPath(tableName), HConstants.HFILE_ARCHIVE_DIRECTORY); + Path dataDir = new Path(baseDir, HConstants.BASE_NAMESPACE_DIR); + Path archivePath = new Path(dataDir, TableName.valueOf(tableName).getNamespaceAsString()); + Path tableArchivePath = + new Path(archivePath, TableName.valueOf(tableName).getQualifierAsString()); + if (!fs.exists(tableArchivePath) || !fs.getFileStatus(tableArchivePath).isDirectory()) { + LOG.debug("Folder tableArchivePath: " + tableArchivePath.toString() + " does not exists"); + tableArchivePath = null; // empty table has no archive + } + return tableArchivePath; + } + + /** + * Given the backup root dir and the backup id, return the log file location for an incremental + * backup. + * @param backupRootDir backup root directory + * @param backupId backup id + * @return logBackupDir: ".../user/biadmin/backup1/WALs/backup_1396650096738" + */ + protected static String getLogBackupDir(String backupRootDir, String backupId) { + return backupRootDir + File.separator + HConstants.HREGION_LOGDIR_NAME + File.separator + + backupId; + } + + protected static Path getLogBackupPath(String backupRootDir, String backupId) { + return new Path(getLogBackupDir(backupRootDir, backupId)); + } + + private Path getManifestPath(String tableName) throws IOException { + Path manifestPath = new Path(getTableBackupPath(tableName), BackupManifest.FILE_NAME); + + LOG.debug("Looking for " + manifestPath.toString()); + if (!fs.exists(manifestPath)) { + // check log dir for incremental backup case + manifestPath = + new Path(getLogBackupDir(this.backupRootPath.toString(), this.backupId) + File.separator + + BackupManifest.FILE_NAME); + LOG.debug("Looking for " + manifestPath.toString()); + if (!fs.exists(manifestPath)) { + String errorMsg = + "Could not find backup manifest for " + backupId + " in " + backupRootPath.toString(); + throw new IOException(errorMsg); + } + } + return manifestPath; + } + + protected BackupManifest getManifest(String tableName) throws IOException { + BackupManifest manifest = new BackupManifest(conf, this.getManifestPath(tableName)); + return manifest; + } + + /** + * Gets region list + * @param tableName table name + * @return RegionList region list + * @throws FileNotFoundException exception + * @throws IOException exception + */ + + protected ArrayList<Path> getRegionList(String tableName) throws FileNotFoundException, + IOException { + Path tableArchivePath = this.getTableArchivePath(tableName); + ArrayList<Path> regionDirList = new ArrayList<Path>(); + FileStatus[] children = fs.listStatus(tableArchivePath); + for (FileStatus childStatus : children) { + // here child refer to each region(Name) + Path child = childStatus.getPath(); + regionDirList.add(child); + } + return regionDirList; + } + + /** + * Gets region list + * @param tableArchivePath table archive path + * @return RegionList region list + * @throws FileNotFoundException exception + * @throws IOException exception + */ + protected ArrayList<Path> getRegionList(Path tableArchivePath) throws FileNotFoundException, + IOException { + ArrayList<Path> regionDirList = new ArrayList<Path>(); + FileStatus[] children = fs.listStatus(tableArchivePath); + for (FileStatus childStatus : children) { + // here child refer to each region(Name) + Path child = childStatus.getPath(); + regionDirList.add(child); + } + return regionDirList; + } + + /** + * Counts the number of files in all subdirectories of an HBase tables, i.e. HFiles. And finds the + * maximum number of files in one HBase table. + * @param tableArchivePath archive path + * @return the maximum number of files found in 1 HBase table + * @throws IOException exception + */ + protected int getMaxNumberOfFilesInSubDir(Path tableArchivePath) throws IOException { + int result = 1; + ArrayList<Path> regionPathList = this.getRegionList(tableArchivePath); + // tableArchivePath = this.getTableArchivePath(tableName); + + if (regionPathList == null || regionPathList.size() == 0) { + throw new IllegalStateException("Cannot restore hbase table because directory '" + + tableArchivePath + "' is not a directory."); + } + + for (Path regionPath : regionPathList) { + result = Math.max(result, getNumberOfFilesInDir(regionPath)); + } + return result; + } + + /** + * Counts the number of files in all subdirectories of an HBase table, i.e. HFiles. + * @param regionPath Path to an HBase table directory + * @return the number of files all directories + * @throws IOException exception + */ + protected int getNumberOfFilesInDir(Path regionPath) throws IOException { + int result = 0; + + if (!fs.exists(regionPath) || !fs.getFileStatus(regionPath).isDirectory()) { + throw new IllegalStateException("Cannot restore hbase table because directory '" + + regionPath.toString() + "' is not a directory."); + } + + FileStatus[] tableDirContent = fs.listStatus(regionPath); + for (FileStatus subDirStatus : tableDirContent) { + FileStatus[] colFamilies = fs.listStatus(subDirStatus.getPath()); + for (FileStatus colFamilyStatus : colFamilies) { + FileStatus[] colFamilyContent = fs.listStatus(colFamilyStatus.getPath()); + result += colFamilyContent.length; + } + } + return result; + } + + /** + * Duplicate the backup image if it's on local cluster + * @see HStore#bulkLoadHFile(String, long) + * @see HRegionFileSystem#bulkLoadStoreFile(String familyName, Path srcPath, long seqNum) + * @param tableArchivePath archive path + * @return the new tableArchivePath + * @throws IOException exception + */ + protected Path checkLocalAndBackup(Path tableArchivePath) throws IOException { + // Move the file if it's on local cluster + boolean isCopyNeeded = false; + + FileSystem srcFs = tableArchivePath.getFileSystem(conf); + FileSystem desFs = FileSystem.get(conf); + if (tableArchivePath.getName().startsWith("/")) { + isCopyNeeded = true; + } else { + // This should match what is done in @see HRegionFileSystem#bulkLoadStoreFile(String, Path, + // long) + if (srcFs.getUri().equals(desFs.getUri())) { + LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: " + + desFs.getUri()); + isCopyNeeded = true; + } + } + if (isCopyNeeded) { + LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore"); + Path tmpPath = new Path(RESTORE_TMP_PATH); + if (desFs.exists(tmpPath)) { + try { + desFs.delete(tmpPath, true); + } catch (IOException e) { + LOG.debug("Failed to delete path: " + tmpPath + + ", need to check whether restore target DFS cluster is healthy"); + } + } + FileUtil.copy(srcFs, tableArchivePath, desFs, tmpPath, false, conf); + LOG.debug("Copied to temporary path on local cluster: " + tmpPath); + tableArchivePath = tmpPath; + } + return tableArchivePath; + } + + /** + * Calculate region boundaries and add all the column families to the table descriptor + * @param regionDirList region dir list + * @return a set of keys to store the boundaries + */ + protected byte[][] generateBoundaryKeys(ArrayList<Path> regionDirList) + throws FileNotFoundException, IOException { + TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); + // Build a set of keys to store the boundaries + byte[][] keys = null; + // calculate region boundaries and add all the column families to the table descriptor + for (Path regionDir : regionDirList) { + LOG.debug("Parsing region dir: " + regionDir); + Path hfofDir = regionDir; + + if (!fs.exists(hfofDir)) { + LOG.warn("HFileOutputFormat dir " + hfofDir + " not found"); + } + + FileStatus[] familyDirStatuses = fs.listStatus(hfofDir); + if (familyDirStatuses == null) { + throw new IOException("No families found in " + hfofDir); + } + + for (FileStatus stat : familyDirStatuses) { + if (!stat.isDirectory()) { + LOG.warn("Skipping non-directory " + stat.getPath()); + continue; + } + boolean isIgnore = false; + String pathName = stat.getPath().getName(); + for (String ignore : ignoreDirs) { + if (pathName.contains(ignore)) { + LOG.warn("Skipping non-family directory" + pathName); + isIgnore = true; + break; + } + } + if (isIgnore) { + continue; + } + Path familyDir = stat.getPath(); + LOG.debug("Parsing family dir [" + familyDir.toString() + " in region [" + regionDir + "]"); + // Skip _logs, etc + if (familyDir.getName().startsWith("_") || familyDir.getName().startsWith(".")) { + continue; + } + + // start to parse hfile inside one family dir + Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir)); + for (Path hfile : hfiles) { + if (hfile.getName().startsWith("_") || hfile.getName().startsWith(".") + || StoreFileInfo.isReference(hfile.getName()) + || HFileLink.isHFileLink(hfile.getName())) { + continue; + } + HFile.Reader reader = HFile.createReader(fs, hfile, new CacheConfig(conf), conf); + final byte[] first, last; + try { + reader.loadFileInfo(); + first = reader.getFirstRowKey(); + last = reader.getLastRowKey(); + LOG.debug("Trying to figure out region boundaries hfile=" + hfile + " first=" + + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); + + // To eventually infer start key-end key boundaries + Integer value = map.containsKey(first) ? (Integer) map.get(first) : 0; + map.put(first, value + 1); + value = map.containsKey(last) ? (Integer) map.get(last) : 0; + map.put(last, value - 1); + } finally { + reader.close(); + } + } + } + } + keys = LoadIncrementalHFiles.inferBoundaries(map); + return keys; + } + + /** + * Check whether the backup path exist + * @param backupStr backup + * @param conf configuration + * @return Yes if path exists + * @throws IOException exception + */ + protected static boolean checkPathExist(String backupStr, Configuration conf) + throws IOException { + boolean isExist = false; + Path backupPath = new Path(backupStr); + FileSystem fileSys = backupPath.getFileSystem(conf); + String targetFsScheme = fileSys.getUri().getScheme(); + LOG.debug("Schema of given url: " + backupStr + " is: " + targetFsScheme); + if (fileSys.exists(backupPath)) { + isExist = true; + } + return isExist; + } + + /** + * Check whether the backup image path and there is manifest file in the path. + * @param backupManifestMap If all the manifests are found, then they are put into this map + * @param tableArray the tables involved + * @throws IOException exception + */ + protected void checkImageManifestExist(HashMap<String, BackupManifest> backupManifestMap, + String[] tableArray) throws IOException { + + try { + for (String tableName : tableArray) { + BackupManifest manifest = this.getManifest(tableName); + backupManifestMap.put(tableName, manifest); + } + } catch (IOException e) { + String expMsg = e.getMessage(); + if (expMsg.contains("No FileSystem for scheme")) { + if (expMsg.contains("gpfs")) { + LOG.error("Please change to use webhdfs url when " + + "the backup image to restore locates on gpfs cluster"); + } else { + LOG.error("Unsupported filesystem scheme found in the backup target url, " + + "please check the url to make sure no typo in it"); + } + throw e; + } else if (expMsg.contains("no authority supported")) { + LOG.error("Please change to use webhdfs url when " + + "the backup image to restore locates on gpfs cluster"); + throw e; + } else { + LOG.error(expMsg); + throw e; + } + } + } + + public static String join(String[] names) { + StringBuilder sb = new StringBuilder(); + String sep = BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND; + for (String s : names) { + sb.append(sep).append(s); + } + return sb.toString(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalBackupManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalBackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalBackupManager.java new file mode 100644 index 0000000..e91857f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalBackupManager.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; + +/** + * After a full backup was created, the incremental backup will only store the changes made + * after the last full or incremental backup. + * + * Creating the backup copies the logfiles in .logs and .oldlogs since the last backup timestamp. + * + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class IncrementalBackupManager { + // parent manager + private BackupManager backupManager; + + public static final Log LOG = LogFactory.getLog(IncrementalBackupManager.class); + + public IncrementalBackupManager(BackupManager bm) { + this.backupManager = bm; + } + + /** + * Obtain the list of logs that need to be copied out for this incremental backup. The list is set + * in BackupContext. + * @param backupContext backup context + * @return The new HashMap of RS log timestamps after the log roll for this incremental backup. + * @throws IOException exception + */ + public HashMap<String, String> getIncrBackupLogFileList(BackupContext backupContext) + throws IOException { + List<String> logList; + HashMap<String, String> newTimestamps; + HashMap<String, String> previousTimestampMins; + + Configuration conf = BackupUtil.getConf(); + String savedStartCode = backupManager.readBackupStartCode(); + + // key: tableName + // value: <RegionServer,PreviousTimeStamp> + HashMap<String, HashMap<String, String>> previousTimestampMap = + backupManager.readLogTimestampMap(); + + previousTimestampMins = BackupUtil.getRSLogTimestampMins(previousTimestampMap); + + LOG.debug("StartCode " + savedStartCode + "for backupID " + backupContext.getBackupId()); + LOG.debug("Timestamps " + previousTimestampMap); + // get all new log files from .logs and .oldlogs after last TS and before new timestamp + if (savedStartCode == null || + previousTimestampMins == null || + previousTimestampMins.isEmpty()) { + throw new IOException("Cannot read any previous back up timestamps from hbase:backup. " + + "In order to create an incremental backup, at least one full backup is needed."); + } + + HBaseAdmin hbadmin = null; + Connection conn = null; + try { + LOG.info("Execute roll log procedure for incremental backup ..."); + conn = ConnectionFactory.createConnection(conf); + hbadmin = (HBaseAdmin) conn.getAdmin(); + hbadmin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, + LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, new HashMap<String, String>()); + } finally { + if (hbadmin != null) { + hbadmin.close(); + } + if(conn != null){ + conn.close(); + } + } + + newTimestamps = backupManager.readRegionServerLastLogRollResult(); + + logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode); + + backupContext.setIncrBackupFileList(logList); + + return newTimestamps; + } + + /** + * For each region server: get all log files newer than the last timestamps but not newer than the + * newest timestamps. + * @param olderTimestamps the timestamp for each region server of the last backup. + * @param newestTimestamps the timestamp for each region server that the backup should lead to. + * @param conf the Hadoop and Hbase configuration + * @param savedStartCode the startcode (timestamp) of last successful backup. + * @return a list of log files to be backed up + * @throws IOException exception + */ + private List<String> getLogFilesForNewBackup(HashMap<String, String> olderTimestamps, + HashMap<String, String> newestTimestamps, Configuration conf, String savedStartCode) + throws IOException { + LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps + + "\n newestTimestamps: " + newestTimestamps); + Path rootdir = FSUtils.getRootDir(conf); + Path logDir = new Path(rootdir, HConstants.HREGION_LOGDIR_NAME); + Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); + FileSystem fs = rootdir.getFileSystem(conf); + NewestLogFilter pathFilter = new NewestLogFilter(conf); + + List<String> resultLogFiles = new ArrayList<String>(); + List<String> newestLogs = new ArrayList<String>(); + + /* + * The old region servers and timestamps info we kept in hbase:backup may be out of sync if new + * region server is added or existing one lost. We'll deal with it here when processing the + * logs. If data in hbase:backup has more hosts, just ignore it. If the .logs directory includes + * more hosts, the additional hosts will not have old timestamps to compare with. We'll just use + * all the logs in that directory. We always write up-to-date region server and timestamp info + * to hbase:backup at the end of successful backup. + */ + + FileStatus[] rss; + Path p; + String host; + String oldTimeStamp; + String currentLogFile; + String currentLogTS; + + // Get the files in .logs. + rss = fs.listStatus(logDir); + for (FileStatus rs : rss) { + p = rs.getPath(); + host = DefaultWALProvider.getServerNameFromWALDirectoryName(p).getHostname(); + FileStatus[] logs; + oldTimeStamp = olderTimestamps.get(host); + // It is possible that there is no old timestamp in hbase:backup for this host if + // this region server is newly added after our last backup. + if (oldTimeStamp == null) { + logs = fs.listStatus(p); + } else { + pathFilter.setLastBackupTS(oldTimeStamp); + logs = fs.listStatus(p, pathFilter); + } + for (FileStatus log : logs) { + LOG.debug("currentLogFile: " + log.getPath().toString()); + if (DefaultWALProvider.isMetaFile(log.getPath())) { + LOG.debug("Skip hbase:meta log file: " + log.getPath().getName()); + continue; + } + currentLogFile = log.getPath().toString(); + resultLogFiles.add(currentLogFile); + currentLogTS = BackupUtil.getCreationTime(log.getPath(), conf); + // newestTimestamps is up-to-date with the current list of hosts + // so newestTimestamps.get(host) will not be null. + if (Long.valueOf(currentLogTS) > Long.valueOf(newestTimestamps.get(host))) { + newestLogs.add(currentLogFile); + } + } + } + + // Include the .oldlogs files too. + FileStatus[] oldlogs = fs.listStatus(oldLogDir); + for (FileStatus oldlog : oldlogs) { + p = oldlog.getPath(); + currentLogFile = p.toString(); + if (DefaultWALProvider.isMetaFile(p)) { + LOG.debug("Skip .meta log file: " + currentLogFile); + continue; + } + host = BackupUtil.parseHostFromOldLog(p); + currentLogTS = BackupUtil.getCreationTime(p, conf); + oldTimeStamp = olderTimestamps.get(host); + /* + * It is possible that there is no old timestamp in hbase:backup for this host. At the time of + * our last backup operation, this rs did not exist. The reason can be one of the two: 1. The + * rs already left/crashed. Its logs were moved to .oldlogs. 2. The rs was added after our + * last backup. + */ + if (oldTimeStamp == null) { + if (Long.valueOf(currentLogTS) < Long.valueOf(savedStartCode)) { + // This log file is really old, its region server was before our last backup. + continue; + } else { + resultLogFiles.add(currentLogFile); + } + } else if (Long.valueOf(currentLogTS) > Long.valueOf(oldTimeStamp)) { + resultLogFiles.add(currentLogFile); + } + + LOG.debug("resultLogFiles before removal of newestLogs: " + resultLogFiles); + // It is possible that a host in .oldlogs is an obsolete region server + // so newestTimestamps.get(host) here can be null. + // Even if these logs belong to a obsolete region server, we still need + // to include they to avoid loss of edits for backup. + String newTimestamp = newestTimestamps.get(host); + if (newTimestamp != null && Long.valueOf(currentLogTS) > Long.valueOf(newTimestamp)) { + newestLogs.add(currentLogFile); + } + } + LOG.debug("newestLogs: " + newestLogs); + // remove newest log per host because they are still in use + resultLogFiles.removeAll(newestLogs); + LOG.debug("resultLogFiles after removal of newestLogs: " + resultLogFiles); + return resultLogFiles; + } + + class NewestLogFilter implements PathFilter { + private String lastBackupTS = "0"; + final private Configuration conf; + + public NewestLogFilter(Configuration conf) { + this.conf = conf; + } + + protected void setLastBackupTS(String ts) { + this.lastBackupTS = ts; + } + + @Override + public boolean accept(Path path) { + // skip meta table log -- ts.meta file + if (DefaultWALProvider.isMetaFile(path)) { + LOG.debug("Skip .meta log file: " + path.getName()); + return false; + } + String timestamp; + try { + timestamp = BackupUtil.getCreationTime(path, conf); + return Long.valueOf(timestamp) > Long.valueOf(lastBackupTS); + } catch (IOException e) { + LOG.warn("Cannot read timestamp of log file " + path); + return false; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java new file mode 100644 index 0000000..72e4879 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface IncrementalRestoreService extends Configurable{ + + public void run(String logDirectory, String[] fromTables, String[] toTables) + throws IOException; +}