http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java new file mode 100644 index 0000000..dc19f9b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java @@ -0,0 +1,144 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; + +/** + * Implementation of a log cleaner that checks if a log is still scheduled for + * incremental backup before deleting it when its TTL is over. + */ +@InterfaceStability.Evolving +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class BackupLogCleaner extends BaseLogCleanerDelegate { + private static final Log LOG = LogFactory.getLog(BackupLogCleaner.class); + + private boolean stopped = false; + private Connection conn; + + public BackupLogCleaner() { + } + + @Override + public void init(Map<String, Object> params) { + if (params != null && params.containsKey(HMaster.MASTER)) { + MasterServices master = (MasterServices) params.get(HMaster.MASTER); + conn = master.getConnection(); + if (getConf() == null) { + super.setConf(conn.getConfiguration()); + } + } + if (conn == null) { + try { + conn = ConnectionFactory.createConnection(getConf()); + } catch (IOException ioe) { + throw new RuntimeException("Failed to create connection", ioe); + } + } + } + + @Override + public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { + // all members of this class are null if backup is disabled, + // so we cannot filter the files + if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) { + LOG.warn("Backup is not enabled. Check your "+ BackupRestoreConstants.BACKUP_ENABLE_KEY + + " setting"); + return files; + } + + List<FileStatus> list = new ArrayList<FileStatus>(); + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + // If we do not have recorded backup sessions + try { + if (!table.hasBackupSessions()) { + LOG.debug("BackupLogCleaner has no backup sessions"); + return files; + } + } catch (TableNotFoundException tnfe) { + LOG.warn("hbase:backup is not available" + tnfe.getMessage()); + return files; + } + + for(FileStatus file: files){ + String wal = file.getPath().toString(); + boolean logInSystemTable = table.isWALFileDeletable(wal); + if(LOG.isDebugEnabled()) { + if(logInSystemTable) { + LOG.debug("Found log file in hbase:backup, deleting: " + wal); + list.add(file); + } else { + LOG.debug("Didn't find this log in hbase:backup, keeping: " + wal); + } + } + } + return list; + } catch (IOException e) { + LOG.error("Failed to get hbase:backup table, therefore will keep all files", e); + // nothing to delete + return new ArrayList<FileStatus>(); + } + } + + @Override + public void setConf(Configuration config) { + // If backup is disabled, keep all members null + if (!config.getBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, + BackupRestoreConstants.BACKUP_ENABLE_DEFAULT)) { + LOG.warn("Backup is disabled - allowing all wals to be deleted"); + return; + } + super.setConf(config); + } + + @Override + public void stop(String why) { + if (this.stopped) { + return; + } + this.stopped = true; + LOG.info("Stopping BackupLogCleaner"); + } + + @Override + public boolean isStopped() { + return this.stopped; + } + +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java new file mode 100644 index 0000000..99d9d91 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.MetricsMaster; +import org.apache.hadoop.hbase.procedure.MasterProcedureManager; +import org.apache.hadoop.hbase.procedure.Procedure; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; +import org.apache.zookeeper.KeeperException; + +public class LogRollMasterProcedureManager extends MasterProcedureManager { + + public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc"; + public static final String ROLLLOG_PROCEDURE_NAME = "rolllog"; + private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class); + + private MasterServices master; + private ProcedureCoordinator coordinator; + private boolean done; + + @Override + public void stop(String why) { + LOG.info("stop: " + why); + } + + @Override + public boolean isStopped() { + return false; + } + + @Override + public void initialize(MasterServices master, MetricsMaster metricsMaster) + throws KeeperException, IOException, UnsupportedOperationException { + this.master = master; + this.done = false; + + // setup the default procedure coordinator + String name = master.getServerName().toString(); + ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); + BaseCoordinatedStateManager coordManager = + (BaseCoordinatedStateManager) CoordinatedStateManagerFactory + .getCoordinatedStateManager(master.getConfiguration()); + coordManager.initialize(master); + + ProcedureCoordinatorRpcs comms = + coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name); + + this.coordinator = new ProcedureCoordinator(comms, tpool); + } + + @Override + public String getProcedureSignature() { + return ROLLLOG_PROCEDURE_SIGNATURE; + } + + @Override + public void execProcedure(ProcedureDescription desc) throws IOException { + if (!isBackupEnabled()) { + LOG.warn("Backup is not enabled. Check your "+ BackupRestoreConstants.BACKUP_ENABLE_KEY + + " setting"); + return; + } + this.done = false; + // start the process on the RS + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); + List<ServerName> serverNames = master.getServerManager().getOnlineServersList(); + List<String> servers = new ArrayList<String>(); + for (ServerName sn : serverNames) { + servers.add(sn.toString()); + } + + List<NameStringPair> conf = desc.getConfigurationList(); + byte[] data = new byte[0]; + if(conf.size() > 0){ + // Get backup root path + data = conf.get(0).getValue().getBytes(); + } + Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), data, servers); + if (proc == null) { + String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'"; + LOG.error(msg); + throw new IOException(msg); + } + + try { + // wait for the procedure to complete. A timer thread is kicked off that should cancel this + // if it takes too long. + proc.waitForCompleted(); + LOG.info("Done waiting - exec procedure for " + desc.getInstance()); + LOG.info("Distributed roll log procedure is successful!"); + this.done = true; + } catch (InterruptedException e) { + ForeignException ee = + new ForeignException("Interrupted while waiting for roll log procdure to finish", e); + monitor.receive(ee); + Thread.currentThread().interrupt(); + } catch (ForeignException e) { + ForeignException ee = + new ForeignException("Exception while waiting for roll log procdure to finish", e); + monitor.receive(ee); + } + monitor.rethrowException(); + } + + private boolean isBackupEnabled() { + return BackupManager.isBackupEnabled(master.getConfiguration()); + } + + @Override + public boolean isProcedureDone(ProcedureDescription desc) throws IOException { + return done; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java new file mode 100644 index 0000000..e2c0d69 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.regionserver; + +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.procedure.ProcedureMember; +import org.apache.hadoop.hbase.procedure.Subprocedure; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL; + +/** + * This backup subprocedure implementation forces a log roll on the RS. + */ +public class LogRollBackupSubprocedure extends Subprocedure { + private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class); + + private final RegionServerServices rss; + private final LogRollBackupSubprocedurePool taskManager; + private FSHLog hlog; + private String backupRoot; + + public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member, + ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, + LogRollBackupSubprocedurePool taskManager, byte[] data) { + + super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener, + wakeFrequency, timeout); + LOG.info("Constructing a LogRollBackupSubprocedure."); + this.rss = rss; + this.taskManager = taskManager; + if(data != null) { + backupRoot = new String(data); + } + } + + /** + * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified + * with no use of subprocedurepool. + */ + class RSRollLogTask implements Callable<Void> { + RSRollLogTask() { + } + + @Override + public Void call() throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("++ DRPC started: " + rss.getServerName()); + } + hlog = (FSHLog) rss.getWAL(null); + long filenum = hlog.getFilenum(); + List<WAL> wals = rss.getWALs(); + long highest = -1; + for (WAL wal : wals) { + if (wal == null) continue; + if (((AbstractFSWAL)wal).getFilenum() > highest) { + highest = ((AbstractFSWAL)wal).getFilenum(); + } + } + + LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum + + " highest: " + highest + " on " + rss.getServerName()); + ((HRegionServer)rss).walRoller.requestRollAll(); + long start = EnvironmentEdgeManager.currentTime(); + while (!((HRegionServer)rss).walRoller.walRollFinished()) { + Thread.sleep(20); + } + LOG.debug("log roll took " + (EnvironmentEdgeManager.currentTime()-start)); + LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum() + + " on " + rss.getServerName()); + + Connection connection = rss.getConnection(); + try(final BackupSystemTable table = new BackupSystemTable(connection)) { + // sanity check, good for testing + HashMap<String, Long> serverTimestampMap = + table.readRegionServerLastLogRollResult(backupRoot); + String host = rss.getServerName().getHostname(); + int port = rss.getServerName().getPort(); + String server = host + ":" + port; + Long sts = serverTimestampMap.get(host); + if (sts != null && sts > highest) { + LOG.warn("Won't update server's last roll log result: current=" + + sts + " new=" + highest); + return null; + } + // write the log number to hbase:backup. + table.writeRegionServerLastLogRollResult(server, highest, backupRoot); + return null; + } catch (Exception e) { + LOG.error(e); + throw e; + } + } + } + + private void rolllog() throws ForeignException { + monitor.rethrowException(); + + taskManager.submitTask(new RSRollLogTask()); + monitor.rethrowException(); + + // wait for everything to complete. + taskManager.waitForOutstandingTasks(); + monitor.rethrowException(); + + } + + @Override + public void acquireBarrier() throws ForeignException { + // do nothing, executing in inside barrier step. + } + + /** + * do a log roll. + * @return some bytes + */ + @Override + public byte[] insideBarrier() throws ForeignException { + rolllog(); + return null; + } + + /** + * Cancel threads if they haven't finished. + */ + @Override + public void cleanup(Exception e) { + taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e); + } + + /** + * Hooray! + */ + public void releaseBarrier() { + // NO OP + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java new file mode 100644 index 0000000..1ca638c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.regionserver; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.errorhandling.ForeignException; + +/** + * Handle running each of the individual tasks for completing a backup procedure + * on a regionserver. + */ +public class LogRollBackupSubprocedurePool implements Closeable, Abortable { + private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedurePool.class); + + /** Maximum number of concurrent snapshot region tasks that can run concurrently */ + private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks"; + private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3; + + private final ExecutorCompletionService<Void> taskPool; + private final ThreadPoolExecutor executor; + private volatile boolean aborted; + private final List<Future<Void>> futures = new ArrayList<Future<Void>>(); + private final String name; + + public LogRollBackupSubprocedurePool(String name, Configuration conf) { + // configure the executor service + long keepAlive = + conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY, + LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT); + int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS); + this.name = name; + executor = + new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs(" + name + + ")-backup-pool")); + taskPool = new ExecutorCompletionService<Void>(executor); + } + + /** + * Submit a task to the pool. + */ + public void submitTask(final Callable<Void> task) { + Future<Void> f = this.taskPool.submit(task); + futures.add(f); + } + + /** + * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)} + * @return <tt>true</tt> on success, <tt>false</tt> otherwise + * @throws ForeignException exception + */ + public boolean waitForOutstandingTasks() throws ForeignException { + LOG.debug("Waiting for backup procedure to finish."); + + try { + for (Future<Void> f : futures) { + f.get(); + } + return true; + } catch (InterruptedException e) { + if (aborted) { + throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!", + e); + } + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + if (e.getCause() instanceof ForeignException) { + throw (ForeignException) e.getCause(); + } + throw new ForeignException(name, e.getCause()); + } finally { + // close off remaining tasks + for (Future<Void> f : futures) { + if (!f.isDone()) { + f.cancel(true); + } + } + } + return false; + } + + /** + * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly + * finish + */ + @Override + public void close() { + executor.shutdown(); + } + + @Override + public void abort(String why, Throwable e) { + if (this.aborted) { + return; + } + + this.aborted = true; + LOG.warn("Aborting because: " + why, e); + this.executor.shutdownNow(); + } + + @Override + public boolean isAborted() { + return this.aborted; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java new file mode 100644 index 0000000..23270fd --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.regionserver; + + +import java.io.IOException; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.procedure.ProcedureMember; +import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; +import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; +import org.apache.hadoop.hbase.procedure.Subprocedure; +import org.apache.hadoop.hbase.procedure.SubprocedureFactory; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.zookeeper.KeeperException; + +/** + * This manager class handles the work dealing with backup for a {@link HRegionServer}. + * <p> + * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is + * responsible by this region server. If any failures occur with the subprocedure, the manager's + * procedure member notifies the procedure coordinator to abort all others. + * <p> + * On startup, requires {@link #start()} to be called. + * <p> + * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be + * called + */ +public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager { + + private static final Log LOG = LogFactory.getLog(LogRollRegionServerProcedureManager.class); + + /** Conf key for number of request threads to start backup on regionservers */ + public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads"; + /** # of threads for backup work on the rs. */ + public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10; + + public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout"; + public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000; + + /** Conf key for millis between checks to see if backup work completed or if there are errors */ + public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency"; + /** Default amount of time to check for errors while regions finish backup work */ + private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500; + + private RegionServerServices rss; + private ProcedureMemberRpcs memberRpcs; + private ProcedureMember member; + private boolean started = false; + + /** + * Create a default backup procedure manager + */ + public LogRollRegionServerProcedureManager() { + } + + /** + * Start accepting backup procedure requests. + */ + @Override + public void start() { + if (!BackupManager.isBackupEnabled(rss.getConfiguration())) { + LOG.warn("Backup is not enabled. Check your "+ BackupRestoreConstants.BACKUP_ENABLE_KEY + + " setting"); + return; + } + this.memberRpcs.start(rss.getServerName().toString(), member); + started = true; + LOG.info("Started region server backup manager."); + } + + /** + * Close <tt>this</tt> and all running backup procedure tasks + * @param force forcefully stop all running tasks + * @throws IOException exception + */ + @Override + public void stop(boolean force) throws IOException { + if (!started) { + return; + } + String mode = force ? "abruptly" : "gracefully"; + LOG.info("Stopping RegionServerBackupManager " + mode + "."); + + try { + this.member.close(); + } finally { + this.memberRpcs.close(); + } + } + + /** + * If in a running state, creates the specified subprocedure for handling a backup procedure. + * @return Subprocedure to submit to the ProcedureMemeber. + */ + public Subprocedure buildSubprocedure(byte[] data) { + + // don't run a backup if the parent is stop(ping) + if (rss.isStopping() || rss.isStopped()) { + throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName() + + ", because stopping/stopped!"); + } + + LOG.info("Attempting to run a roll log procedure for backup."); + ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher(); + Configuration conf = rss.getConfiguration(); + long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); + long wakeMillis = + conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT); + + LogRollBackupSubprocedurePool taskManager = + new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf); + return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis, + taskManager, data); + + } + + /** + * Build the actual backup procedure runner that will do all the 'hard' work + */ + public class BackupSubprocedureBuilder implements SubprocedureFactory { + + @Override + public Subprocedure buildSubprocedure(String name, byte[] data) { + return LogRollRegionServerProcedureManager.this.buildSubprocedure(data); + } + } + + @Override + public void initialize(RegionServerServices rss) throws KeeperException { + this.rss = rss; + if (!BackupManager.isBackupEnabled(rss.getConfiguration())) { + LOG.warn("Backup is not enabled. Check your "+ BackupRestoreConstants.BACKUP_ENABLE_KEY + + " setting"); + return; + } + BaseCoordinatedStateManager coordManager = + (BaseCoordinatedStateManager) CoordinatedStateManagerFactory.getCoordinatedStateManager(rss + .getConfiguration()); + coordManager.initialize(rss); + this.memberRpcs = + coordManager + .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE); + + // read in the backup handler configuration properties + Configuration conf = rss.getConfiguration(); + long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); + int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT); + // create the actual cohort member + ThreadPoolExecutor pool = + ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive); + this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder()); + } + + @Override + public String getProcedureSignature() { + return "backup-proc"; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupClientUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupClientUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupClientUtil.java new file mode 100644 index 0000000..81dd833 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupClientUtil.java @@ -0,0 +1,437 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * A collection of methods used by multiple classes to backup HBase tables. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class BackupClientUtil { + protected static final Log LOG = LogFactory.getLog(BackupClientUtil.class); + public static final String LOGNAME_SEPARATOR = "."; + + private BackupClientUtil() { + throw new AssertionError("Instantiating utility class..."); + } + + /** + * Check whether the backup path exist + * @param backupStr backup + * @param conf configuration + * @return Yes if path exists + * @throws IOException exception + */ + public static boolean checkPathExist(String backupStr, Configuration conf) throws IOException { + boolean isExist = false; + Path backupPath = new Path(backupStr); + FileSystem fileSys = backupPath.getFileSystem(conf); + String targetFsScheme = fileSys.getUri().getScheme(); + if (LOG.isTraceEnabled()) { + LOG.trace("Schema of given url: " + backupStr + " is: " + targetFsScheme); + } + if (fileSys.exists(backupPath)) { + isExist = true; + } + return isExist; + } + + // check target path first, confirm it doesn't exist before backup + public static void checkTargetDir(String backupRootPath, Configuration conf) throws IOException { + boolean targetExists = false; + try { + targetExists = checkPathExist(backupRootPath, conf); + } catch (IOException e) { + String expMsg = e.getMessage(); + String newMsg = null; + if (expMsg.contains("No FileSystem for scheme")) { + newMsg = + "Unsupported filesystem scheme found in the backup target url. Error Message: " + + newMsg; + LOG.error(newMsg); + throw new IOException(newMsg); + } else { + throw e; + } + } + + if (targetExists) { + LOG.info("Using existing backup root dir: " + backupRootPath); + } else { + LOG.info("Backup root dir " + backupRootPath + " does not exist. Will be created."); + } + } + + /** + * Get the min value for all the Values a map. + * @param map map + * @return the min value + */ + public static <T> Long getMinValue(HashMap<T, Long> map) { + Long minTimestamp = null; + if (map != null) { + ArrayList<Long> timestampList = new ArrayList<Long>(map.values()); + Collections.sort(timestampList); + // The min among all the RS log timestamps will be kept in hbase:backup table. + minTimestamp = timestampList.get(0); + } + return minTimestamp; + } + + /** + * Parses host name:port from archived WAL path + * @param p path + * @return host name + * @throws IOException exception + */ + public static String parseHostFromOldLog(Path p) { + try { + String n = p.getName(); + int idx = n.lastIndexOf(LOGNAME_SEPARATOR); + String s = URLDecoder.decode(n.substring(0, idx), "UTF8"); + return ServerName.parseHostname(s) + ":" + ServerName.parsePort(s); + } catch (Exception e) { + LOG.warn("Skip log file (can't parse): " + p); + return null; + } + } + + /** + * Given the log file, parse the timestamp from the file name. The timestamp is the last number. + * @param p a path to the log file + * @return the timestamp + * @throws IOException exception + */ + public static Long getCreationTime(Path p) throws IOException { + int idx = p.getName().lastIndexOf(LOGNAME_SEPARATOR); + if (idx < 0) { + throw new IOException("Cannot parse timestamp from path " + p); + } + String ts = p.getName().substring(idx + 1); + return Long.parseLong(ts); + } + + public static List<String> getFiles(FileSystem fs, Path rootDir, List<String> files, + PathFilter filter) throws FileNotFoundException, IOException { + RemoteIterator<LocatedFileStatus> it = fs.listFiles(rootDir, true); + + while (it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if (lfs.isDirectory()) { + continue; + } + // apply filter + if (filter.accept(lfs.getPath())) { + files.add(lfs.getPath().toString()); + } + } + return files; + } + + public static void cleanupBackupData(BackupInfo context, Configuration conf) throws IOException { + cleanupHLogDir(context, conf); + cleanupTargetDir(context, conf); + } + + /** + * Clean up directories which are generated when DistCp copying hlogs. + * @throws IOException + */ + private static void cleanupHLogDir(BackupInfo backupContext, Configuration conf) + throws IOException { + + String logDir = backupContext.getHLogTargetDir(); + if (logDir == null) { + LOG.warn("No log directory specified for " + backupContext.getBackupId()); + return; + } + + Path rootPath = new Path(logDir).getParent(); + FileSystem fs = FileSystem.get(rootPath.toUri(), conf); + FileStatus[] files = listStatus(fs, rootPath, null); + if (files == null) { + return; + } + for (FileStatus file : files) { + LOG.debug("Delete log files: " + file.getPath().getName()); + fs.delete(file.getPath(), true); + } + } + + /** + * Clean up the data at target directory + */ + private static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) { + try { + // clean up the data at target directory + LOG.debug("Trying to cleanup up target dir : " + backupInfo.getBackupId()); + String targetDir = backupInfo.getTargetRootDir(); + if (targetDir == null) { + LOG.warn("No target directory specified for " + backupInfo.getBackupId()); + return; + } + + FileSystem outputFs = FileSystem.get(new Path(backupInfo.getTargetRootDir()).toUri(), conf); + + for (TableName table : backupInfo.getTables()) { + Path targetDirPath = + new Path(getTableBackupDir(backupInfo.getTargetRootDir(), backupInfo.getBackupId(), + table)); + if (outputFs.delete(targetDirPath, true)) { + LOG.info("Cleaning up backup data at " + targetDirPath.toString() + " done."); + } else { + LOG.info("No data has been found in " + targetDirPath.toString() + "."); + } + + Path tableDir = targetDirPath.getParent(); + FileStatus[] backups = listStatus(outputFs, tableDir, null); + if (backups == null || backups.length == 0) { + outputFs.delete(tableDir, true); + LOG.debug(tableDir.toString() + " is empty, remove it."); + } + } + outputFs.delete(new Path(targetDir, backupInfo.getBackupId()), true); + } catch (IOException e1) { + LOG.error("Cleaning up backup data of " + backupInfo.getBackupId() + " at " + + backupInfo.getTargetRootDir() + " failed due to " + e1.getMessage() + "."); + } + } + + /** + * Given the backup root dir, backup id and the table name, return the backup image location, + * which is also where the backup manifest file is. return value look like: + * "hdfs://backup.hbase.org:9000/user/biadmin/backup1/backup_1396650096738/default/t1_dn/" + * @param backupRootDir backup root directory + * @param backupId backup id + * @param tableName table name + * @return backupPath String for the particular table + */ + public static String + getTableBackupDir(String backupRootDir, String backupId, TableName tableName) { + return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR + + tableName.getNamespaceAsString() + Path.SEPARATOR + tableName.getQualifierAsString() + + Path.SEPARATOR; + } + + public static TableName[] parseTableNames(String tables) { + if (tables == null) { + return null; + } + String[] tableArray = tables.split(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND); + + TableName[] ret = new TableName[tableArray.length]; + for (int i = 0; i < tableArray.length; i++) { + ret[i] = TableName.valueOf(tableArray[i]); + } + return ret; + } + + /** + * Sort history list by start time in descending order. + * @param historyList history list + * @return sorted list of BackupCompleteData + */ + public static ArrayList<BackupInfo> sortHistoryListDesc(ArrayList<BackupInfo> historyList) { + ArrayList<BackupInfo> list = new ArrayList<BackupInfo>(); + TreeMap<String, BackupInfo> map = new TreeMap<String, BackupInfo>(); + for (BackupInfo h : historyList) { + map.put(Long.toString(h.getStartTs()), h); + } + Iterator<String> i = map.descendingKeySet().iterator(); + while (i.hasNext()) { + list.add(map.get(i.next())); + } + return list; + } + + /** + * Returns WAL file name + * @param walFileName WAL file name + * @return WAL file name + * @throws IOException exception + * @throws IllegalArgumentException exception + */ + public static String getUniqueWALFileNamePart(String walFileName) throws IOException { + return getUniqueWALFileNamePart(new Path(walFileName)); + } + + /** + * Returns WAL file name + * @param p - WAL file path + * @return WAL file name + * @throws IOException exception + */ + public static String getUniqueWALFileNamePart(Path p) throws IOException { + return p.getName(); + } + + /** + * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates + * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, + * and return an empty FileStatus[] while Hadoop 2 will throw FileNotFoundException. + * @param fs file system + * @param dir directory + * @param filter path filter + * @return null if dir is empty or doesn't exist, otherwise FileStatus array + */ + public static FileStatus[] + listStatus(final FileSystem fs, final Path dir, final PathFilter filter) throws IOException { + FileStatus[] status = null; + try { + status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter); + } catch (FileNotFoundException fnfe) { + // if directory doesn't exist, return null + if (LOG.isTraceEnabled()) { + LOG.trace(dir + " doesn't exist"); + } + } + if (status == null || status.length < 1) return null; + return status; + } + + /** + * Return the 'path' component of a Path. In Hadoop, Path is an URI. This method returns the + * 'path' component of a Path's URI: e.g. If a Path is + * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>, this method returns + * <code>/hbase_trunk/TestTable/compaction.dir</code>. This method is useful if you want to print + * out a Path without qualifying Filesystem instance. + * @param p Filesystem Path whose 'path' component we are to return. + * @return Path portion of the Filesystem + */ + public static String getPath(Path p) { + return p.toUri().getPath(); + } + + /** + * Given the backup root dir and the backup id, return the log file location for an incremental + * backup. + * @param backupRootDir backup root directory + * @param backupId backup id + * @return logBackupDir: ".../user/biadmin/backup1/WALs/backup_1396650096738" + */ + public static String getLogBackupDir(String backupRootDir, String backupId) { + return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR + + HConstants.HREGION_LOGDIR_NAME; + } + + private static List<BackupInfo> getHistory(Configuration conf, Path backupRootPath) + throws IOException { + // Get all (n) history from backup root destination + FileSystem fs = FileSystem.get(conf); + RemoteIterator<LocatedFileStatus> it = fs.listLocatedStatus(backupRootPath); + + List<BackupInfo> infos = new ArrayList<BackupInfo>(); + while (it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if (!lfs.isDirectory()) continue; + String backupId = lfs.getPath().getName(); + try { + BackupInfo info = loadBackupInfo(backupRootPath, backupId, fs); + infos.add(info); + } catch(IOException e) { + LOG.error("Can not load backup info from: "+ lfs.getPath(), e); + } + } + // Sort + Collections.sort(infos, new Comparator<BackupInfo>() { + + @Override + public int compare(BackupInfo o1, BackupInfo o2) { + long ts1 = getTimestamp(o1.getBackupId()); + long ts2 = getTimestamp(o2.getBackupId()); + if (ts1 == ts2) return 0; + return ts1 < ts2 ? 1 : -1; + } + + private long getTimestamp(String backupId) { + String[] split = backupId.split("_"); + return Long.parseLong(split[1]); + } + }); + return infos; + } + + public static List<BackupInfo> getHistory(Configuration conf, int n, Path backupRootPath, + BackupInfo.Filter... filters) throws IOException { + List<BackupInfo> infos = getHistory(conf, backupRootPath); + List<BackupInfo> ret = new ArrayList<BackupInfo>(); + for (BackupInfo info : infos) { + if (ret.size() == n) { + break; + } + boolean passed = true; + for (int i = 0; i < filters.length; i++) { + if (!filters[i].apply(info)) { + passed = false; + break; + } + } + if (passed) { + ret.add(info); + } + } + return ret; + } + + public static BackupInfo loadBackupInfo(Path backupRootPath, String backupId, FileSystem fs) + throws IOException { + Path backupPath = new Path(backupRootPath, backupId); + + RemoteIterator<LocatedFileStatus> it = fs.listFiles(backupPath, true); + while (it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if (lfs.getPath().getName().equals(BackupManifest.MANIFEST_FILE_NAME)) { + // Load BackupManifest + BackupManifest manifest = new BackupManifest(fs, lfs.getPath().getParent()); + BackupInfo info = manifest.toBackupInfo(); + return info; + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java new file mode 100644 index 0000000..72e598f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java @@ -0,0 +1,487 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.util; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; + +/** + * A collection for methods used by multiple classes to backup HBase tables. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class BackupServerUtil { + protected static final Log LOG = LogFactory.getLog(BackupServerUtil.class); + public static final String LOGNAME_SEPARATOR = "."; + + private BackupServerUtil(){ + throw new AssertionError("Instantiating utility class..."); + } + + public static void waitForSnapshot(SnapshotDescription snapshot, long max, + SnapshotManager snapshotMgr, Configuration conf) throws IOException { + boolean done = false; + long start = EnvironmentEdgeManager.currentTime(); + int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + long maxPauseTime = max / numRetries; + int tries = 0; + LOG.debug("Waiting a max of " + max + " ms for snapshot '" + + ClientSnapshotDescriptionUtils.toString(snapshot) + "'' to complete. (max " + + maxPauseTime + " ms per retry)"); + while (tries == 0 + || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) { + try { + // sleep a backoff <= pauseTime amount + long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + long sleep = getPauseTime(tries++, pause); + sleep = sleep > maxPauseTime ? maxPauseTime : sleep; + LOG.debug("(#" + tries + ") Sleeping: " + sleep + + "ms while waiting for snapshot completion."); + Thread.sleep(sleep); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e); + } + LOG.debug("Getting current status of snapshot ..."); + done = snapshotMgr.isSnapshotDone(snapshot); + } + if (!done) { + throw new IOException("Snapshot '" + snapshot.getName() + + "' wasn't completed in expectedTime:" + max + " ms"); + } + } + + public static long getPauseTime(int tries, long pause) { + int triesCount = tries; + if (triesCount >= HConstants.RETRY_BACKOFF.length) { + triesCount = HConstants.RETRY_BACKOFF.length - 1; + } + return pause * HConstants.RETRY_BACKOFF[triesCount]; + } + /** + * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp + * value for the RS among the tables. + * @param rsLogTimestampMap timestamp map + * @return the min timestamp of each RS + */ + public static HashMap<String, Long> getRSLogTimestampMins( + HashMap<TableName, HashMap<String, Long>> rsLogTimestampMap) { + + if (rsLogTimestampMap == null || rsLogTimestampMap.isEmpty()) { + return null; + } + + HashMap<String, Long> rsLogTimestampMins = new HashMap<String, Long>(); + HashMap<String, HashMap<TableName, Long>> rsLogTimestampMapByRS = + new HashMap<String, HashMap<TableName, Long>>(); + + for (Entry<TableName, HashMap<String, Long>> tableEntry : rsLogTimestampMap.entrySet()) { + TableName table = tableEntry.getKey(); + HashMap<String, Long> rsLogTimestamp = tableEntry.getValue(); + for (Entry<String, Long> rsEntry : rsLogTimestamp.entrySet()) { + String rs = rsEntry.getKey(); + Long ts = rsEntry.getValue(); + if (!rsLogTimestampMapByRS.containsKey(rs)) { + rsLogTimestampMapByRS.put(rs, new HashMap<TableName, Long>()); + rsLogTimestampMapByRS.get(rs).put(table, ts); + } else { + rsLogTimestampMapByRS.get(rs).put(table, ts); + } + } + } + + for (Entry<String, HashMap<TableName, Long>> entry : rsLogTimestampMapByRS.entrySet()) { + String rs = entry.getKey(); + rsLogTimestampMins.put(rs, BackupClientUtil.getMinValue(entry.getValue())); + } + + return rsLogTimestampMins; + } + + /** + * copy out Table RegionInfo into incremental backup image need to consider move this logic into + * HBackupFileSystem + * @param conn connection + * @param backupContext backup context + * @param conf configuration + * @throws IOException exception + * @throws InterruptedException exception + */ + public static void copyTableRegionInfo(Connection conn, BackupInfo backupContext, + Configuration conf) throws IOException, InterruptedException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + // for each table in the table set, copy out the table info and region + // info files in the correct directory structure + for (TableName table : backupContext.getTables()) { + + if(!MetaTableAccessor.tableExists(conn, table)) { + LOG.warn("Table "+ table+" does not exists, skipping it."); + continue; + } + HTableDescriptor orig = FSTableDescriptors.getTableDescriptorFromFs(fs, rootDir, table); + + // write a copy of descriptor to the target directory + Path target = new Path(backupContext.getBackupStatus(table).getTargetDir()); + FileSystem targetFs = target.getFileSystem(conf); + FSTableDescriptors descriptors = + new FSTableDescriptors(conf, targetFs, FSUtils.getRootDir(conf)); + descriptors.createTableDescriptorForTableDirectory(target, orig, false); + LOG.debug("Attempting to copy table info for:" + table + " target: " + target + + " descriptor: " + orig); + LOG.debug("Finished copying tableinfo."); + List<HRegionInfo> regions = null; + regions = MetaTableAccessor.getTableRegions(conn, table); + // For each region, write the region info to disk + LOG.debug("Starting to write region info for table " + table); + for (HRegionInfo regionInfo : regions) { + Path regionDir = + HRegion.getRegionDir(new Path(backupContext.getBackupStatus(table).getTargetDir()), + regionInfo); + regionDir = + new Path(backupContext.getBackupStatus(table).getTargetDir(), regionDir.getName()); + writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo); + } + LOG.debug("Finished writing region info for table " + table); + } + } + + /** + * Write the .regioninfo file on-disk. + */ + public static void writeRegioninfoOnFilesystem(final Configuration conf, final FileSystem fs, + final Path regionInfoDir, HRegionInfo regionInfo) throws IOException { + final byte[] content = regionInfo.toDelimitedByteArray(); + Path regionInfoFile = new Path(regionInfoDir, ".regioninfo"); + // First check to get the permissions + FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); + // Write the RegionInfo file content + FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null); + try { + out.write(content); + } finally { + out.close(); + } + } + + /** + * Parses hostname:port from WAL file path + * @param p path to WAL file + * @return hostname:port + */ + public static String parseHostNameFromLogFile(Path p) { + try { + if (isArchivedLogFile(p)) { + return BackupClientUtil.parseHostFromOldLog(p); + } else { + ServerName sname = AbstractFSWALProvider.getServerNameFromWALDirectoryName(p); + return sname.getHostname() + ":" + sname.getPort(); + } + } catch (Exception e) { + LOG.warn("Skip log file (can't parse): " + p); + return null; + } + } + + private static boolean isArchivedLogFile(Path p) { + String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR; + return p.toString().contains(oldLog); + } + + /** + * Returns WAL file name + * @param walFileName WAL file name + * @return WAL file name + * @throws IOException exception + * @throws IllegalArgumentException exception + */ + public static String getUniqueWALFileNamePart(String walFileName) throws IOException { + return getUniqueWALFileNamePart(new Path(walFileName)); + } + + /** + * Returns WAL file name + * @param p - WAL file path + * @return WAL file name + * @throws IOException exception + */ + public static String getUniqueWALFileNamePart(Path p) throws IOException { + return p.getName(); + } + + /** + * Get the total length of files under the given directory recursively. + * @param fs The hadoop file system + * @param dir The target directory + * @return the total length of files + * @throws IOException exception + */ + public static long getFilesLength(FileSystem fs, Path dir) throws IOException { + long totalLength = 0; + FileStatus[] files = FSUtils.listStatus(fs, dir); + if (files != null) { + for (FileStatus fileStatus : files) { + if (fileStatus.isDirectory()) { + totalLength += getFilesLength(fs, fileStatus.getPath()); + } else { + totalLength += fileStatus.getLen(); + } + } + } + return totalLength; + } + + + + /** + * Sort history list by start time in descending order. + * @param historyList history list + * @return sorted list of BackupCompleteData + */ + public static ArrayList<BackupInfo> sortHistoryListDesc( + ArrayList<BackupInfo> historyList) { + ArrayList<BackupInfo> list = new ArrayList<BackupInfo>(); + TreeMap<String, BackupInfo> map = new TreeMap<String, BackupInfo>(); + for (BackupInfo h : historyList) { + map.put(Long.toString(h.getStartTs()), h); + } + Iterator<String> i = map.descendingKeySet().iterator(); + while (i.hasNext()) { + list.add(map.get(i.next())); + } + return list; + } + + /** + * Get list of all WAL files (WALs and archive) + * @param c - configuration + * @return list of WAL files + * @throws IOException exception + */ + public static List<String> getListOfWALFiles(Configuration c) throws IOException { + Path rootDir = FSUtils.getRootDir(c); + Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + List<String> logFiles = new ArrayList<String>(); + + FileSystem fs = FileSystem.get(c); + logFiles = BackupClientUtil.getFiles(fs, logDir, logFiles, null); + logFiles = BackupClientUtil.getFiles(fs, oldLogDir, logFiles, null); + return logFiles; + } + + /** + * Get list of all WAL files (WALs and archive) + * @param c - configuration + * @return list of WAL files + * @throws IOException exception + */ + public static List<String> getListOfWALFiles(Configuration c, PathFilter filter) + throws IOException { + Path rootDir = FSUtils.getRootDir(c); + Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + List<String> logFiles = new ArrayList<String>(); + + FileSystem fs = FileSystem.get(c); + logFiles = BackupClientUtil.getFiles(fs, logDir, logFiles, filter); + logFiles = BackupClientUtil.getFiles(fs, oldLogDir, logFiles, filter); + return logFiles; + } + + /** + * Get list of all old WAL files (WALs and archive) + * @param c - configuration + * @param hostTimestampMap - host timestamp map + * @return list of WAL files + * @throws IOException exception + */ + public static List<String> getWALFilesOlderThan(final Configuration c, + final HashMap<String, Long> hostTimestampMap) throws IOException { + Path rootDir = FSUtils.getRootDir(c); + Path logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + List<String> logFiles = new ArrayList<String>(); + + PathFilter filter = new PathFilter() { + + @Override + public boolean accept(Path p) { + try { + if (AbstractFSWALProvider.isMetaFile(p)) { + return false; + } + String host = parseHostNameFromLogFile(p); + if(host == null) { + return false; + } + Long oldTimestamp = hostTimestampMap.get(host); + Long currentLogTS = BackupClientUtil.getCreationTime(p); + return currentLogTS <= oldTimestamp; + } catch (Exception e) { + LOG.warn("Can not parse"+ p, e); + return false; + } + } + }; + FileSystem fs = FileSystem.get(c); + logFiles = BackupClientUtil.getFiles(fs, logDir, logFiles, filter); + logFiles = BackupClientUtil.getFiles(fs, oldLogDir, logFiles, filter); + return logFiles; + } + + public static String join(TableName[] names) { + StringBuilder sb = new StringBuilder(); + String sep = BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND; + for (TableName s : names) { + sb.append(sep).append(s.getNameAsString()); + } + return sb.toString(); + } + + public static TableName[] parseTableNames(String tables) { + if (tables == null) { + return null; + } + String[] tableArray = tables.split(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND); + + TableName[] ret = new TableName[tableArray.length]; + for (int i = 0; i < tableArray.length; i++) { + ret[i] = TableName.valueOf(tableArray[i]); + } + return ret; + } + + public static void cleanupBackupData(BackupInfo context, Configuration conf) + throws IOException + { + cleanupHLogDir(context, conf); + cleanupTargetDir(context, conf); + } + + /** + * Clean up directories which are generated when DistCp copying hlogs. + * @throws IOException + */ + private static void cleanupHLogDir(BackupInfo backupContext, Configuration conf) + throws IOException { + + String logDir = backupContext.getHLogTargetDir(); + if (logDir == null) { + LOG.warn("No log directory specified for " + backupContext.getBackupId()); + return; + } + + Path rootPath = new Path(logDir).getParent(); + FileSystem fs = FileSystem.get(rootPath.toUri(), conf); + FileStatus[] files = FSUtils.listStatus(fs, rootPath); + if (files == null) { + return; + } + for (FileStatus file : files) { + LOG.debug("Delete log files: " + file.getPath().getName()); + if(!FSUtils.delete(fs, file.getPath(), true)) { + LOG.warn("Could not delete files in "+ file.getPath()); + }; + } + } + + /** + * Clean up the data at target directory + */ + private static void cleanupTargetDir(BackupInfo backupContext, Configuration conf) { + try { + // clean up the data at target directory + LOG.debug("Trying to cleanup up target dir : " + backupContext.getBackupId()); + String targetDir = backupContext.getTargetRootDir(); + if (targetDir == null) { + LOG.warn("No target directory specified for " + backupContext.getBackupId()); + return; + } + + FileSystem outputFs = + FileSystem.get(new Path(backupContext.getTargetRootDir()).toUri(), conf); + + for (TableName table : backupContext.getTables()) { + Path targetDirPath = + new Path(HBackupFileSystem.getTableBackupDir(backupContext.getTargetRootDir(), + backupContext.getBackupId(), table)); + if (outputFs.delete(targetDirPath, true)) { + LOG.info("Cleaning up backup data at " + targetDirPath.toString() + " done."); + } else { + LOG.info("No data has been found in " + targetDirPath.toString() + "."); + } + + Path tableDir = targetDirPath.getParent(); + FileStatus[] backups = FSUtils.listStatus(outputFs, tableDir); + if (backups == null || backups.length == 0) { + if(outputFs.delete(tableDir, true)){ + LOG.debug(tableDir.toString() + " is empty, remove it."); + } else { + LOG.warn("Could not delete "+ tableDir); + } + } + } + + } catch (IOException e1) { + LOG.error("Cleaning up backup data of " + backupContext.getBackupId() + " at " + + backupContext.getTargetRootDir() + " failed due to " + e1.getMessage() + "."); + } + } + + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java new file mode 100644 index 0000000..ccea8d8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupSet.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.util; +import java.util.List; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +/** + * Backup set is a named group of HBase tables, + * which are managed together by Backup/Restore + * framework. Instead of using list of tables in backup or restore + * operation, one can use set's name instead. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class BackupSet { + private final String name; + private final List<TableName> tables; + + public BackupSet(String name, List<TableName> tables) { + this.name = name; + this.tables = tables; + } + + public String getName() { + return name; + } + + public List<TableName> getTables() { + return tables; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(name).append("={"); + for (int i = 0; i < tables.size(); i++) { + sb.append(tables.get(i)); + if (i < tables.size() - 1) { + sb.append(","); + } + } + sb.append("}"); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/LogUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/LogUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/LogUtils.java new file mode 100644 index 0000000..0a60b5b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/LogUtils.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.util; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +@InterfaceAudience.Private +public final class LogUtils { + + private LogUtils() { + } + /** + * Disables Zk- and HBase client logging + * @param log + */ + public static void disableZkAndClientLoggers(Log log) { + // disable zookeeper log to avoid it mess up command output + Logger zkLogger = Logger.getLogger("org.apache.zookeeper"); + zkLogger.setLevel(Level.OFF); + // disable hbase zookeeper tool log to avoid it mess up command output + Logger hbaseZkLogger = Logger.getLogger("org.apache.hadoop.hbase.zookeeper"); + hbaseZkLogger.setLevel(Level.OFF); + // disable hbase client log to avoid it mess up command output + Logger hbaseClientLogger = Logger.getLogger("org.apache.hadoop.hbase.client"); + hbaseClientLogger.setLevel(Level.OFF); + } +}