Repository: hbase Updated Branches: refs/heads/HBASE-7912 909c4efa8 -> 914d162ca
HBASE-16861 Rename Service to Task (Vladimir Rodionov) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/914d162c Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/914d162c Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/914d162c Branch: refs/heads/HBASE-7912 Commit: 914d162ca163c818654eb912fd2c42e6ea58fa72 Parents: 909c4ef Author: tedyu <yuzhih...@gmail.com> Authored: Mon Oct 17 11:59:54 2016 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Mon Oct 17 11:59:54 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/backup/BackupCopyService.java | 53 --- .../hadoop/hbase/backup/BackupCopyTask.java | 53 +++ .../backup/BackupRestoreServerFactory.java | 24 +- .../hadoop/hbase/backup/RestoreService.java | 50 --- .../apache/hadoop/hbase/backup/RestoreTask.java | 50 +++ .../backup/impl/FullTableBackupClient.java | 4 +- .../impl/IncrementalTableBackupClient.java | 4 +- .../mapreduce/MapReduceBackupCopyService.java | 349 ------------------- .../mapreduce/MapReduceBackupCopyTask.java | 349 +++++++++++++++++++ .../mapreduce/MapReduceRestoreService.java | 171 --------- .../backup/mapreduce/MapReduceRestoreTask.java | 171 +++++++++ .../hbase/backup/util/RestoreServerUtil.java | 6 +- 12 files changed, 642 insertions(+), 642 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyService.java deleted file mode 100644 index 6c70123..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyService.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.backup.impl.BackupManager; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -@InterfaceAudience.Private -@InterfaceStability.Evolving -public interface BackupCopyService extends Configurable { - - /** - * Copy backup data - * @param backupContext - context - * @param backupManager - manager - * @param conf - configuration - * @param copyType - copy type - * @param options - array of options (implementation-specific) - * @return result (0 - success) - * @throws IOException - */ - public int copy(BackupInfo backupContext, BackupManager backupManager, Configuration conf, - BackupType copyType, String[] options) throws IOException; - - - /** - * Cancel copy job - * @param jobHandler - copy job handler - * @throws IOException - */ - public void cancelCopyJob(String jobHandler) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyTask.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyTask.java new file mode 100644 index 0000000..ba23bd4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupCopyTask.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface BackupCopyTask extends Configurable { + + /** + * Copy backup data task + * @param backupContext - context + * @param backupManager - manager + * @param conf - configuration + * @param copyType - copy type + * @param options - array of options (implementation-specific) + * @return result (0 - success) + * @throws IOException + */ + public int copy(BackupInfo backupContext, BackupManager backupManager, Configuration conf, + BackupType copyType, String[] options) throws IOException; + + + /** + * Cancel copy job + * @param jobHandler - copy job handler + * @throws IOException + */ + public void cancelCopyJob(String jobHandler) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java index 7644a4d..06e367f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.backup; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyService; -import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreService; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyTask; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreTask; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.util.ReflectionUtils; @@ -40,11 +40,11 @@ public final class BackupRestoreServerFactory { * @param conf - configuration * @return backup restore service instance */ - public static RestoreService getRestoreService(Configuration conf) { - Class<? extends RestoreService> cls = - conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, MapReduceRestoreService.class, - RestoreService.class); - RestoreService service = ReflectionUtils.newInstance(cls, conf); + public static RestoreTask getRestoreService(Configuration conf) { + Class<? extends RestoreTask> cls = + conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, MapReduceRestoreTask.class, + RestoreTask.class); + RestoreTask service = ReflectionUtils.newInstance(cls, conf); service.setConf(conf); return service; } @@ -54,11 +54,11 @@ public final class BackupRestoreServerFactory { * @param conf - configuration * @return backup copy service */ - public static BackupCopyService getBackupCopyService(Configuration conf) { - Class<? extends BackupCopyService> cls = - conf.getClass(HBASE_BACKUP_COPY_IMPL_CLASS, MapReduceBackupCopyService.class, - BackupCopyService.class); - BackupCopyService service = ReflectionUtils.newInstance(cls, conf);; + public static BackupCopyTask getBackupCopyService(Configuration conf) { + Class<? extends BackupCopyTask> cls = + conf.getClass(HBASE_BACKUP_COPY_IMPL_CLASS, MapReduceBackupCopyTask.class, + BackupCopyTask.class); + BackupCopyTask service = ReflectionUtils.newInstance(cls, conf);; service.setConf(conf); return service; } http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java deleted file mode 100644 index 2da98c2..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -@InterfaceAudience.Private -@InterfaceStability.Evolving - -/** - * Backup restore service interface - * Concrete implementation is provided by backup provider. - */ - -public interface RestoreService extends Configurable{ - - /** - * Run restore operation - * @param dirPaths - path array of WAL log directories - * @param fromTables - from tables - * @param toTables - to tables - * @param fullBackupRestore - full backup restore - * @throws IOException - */ - public void run(Path[] dirPaths, TableName[] fromTables, - TableName[] toTables, boolean fullBackupRestore) - throws IOException; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreTask.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreTask.java new file mode 100644 index 0000000..fa4ed3a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreTask.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving + +/** + * Backup restore task interface + * Concrete implementation is provided by backup provider. + */ + +public interface RestoreTask extends Configurable{ + + /** + * Run restore task + * @param dirPaths - path array of WAL log directories + * @param fromTables - from tables + * @param toTables - to tables + * @param fullBackupRestore - full backup restore + * @throws IOException + */ + public void run(Path[] dirPaths, TableName[] fromTables, + TableName[] toTables, boolean fullBackupRestore) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java index 5a8b941..ce2aea9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -31,7 +31,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupCopyService; +import org.apache.hadoop.hbase.backup.BackupCopyTask; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; @@ -266,7 +266,7 @@ public class FullTableBackupClient { // call ExportSnapshot to copy files based on hbase snapshot for backup // ExportSnapshot only support single snapshot export, need loop for multiple tables case - BackupCopyService copyService = BackupRestoreServerFactory.getBackupCopyService(conf); + BackupCopyTask copyService = BackupRestoreServerFactory.getBackupCopyService(conf); // number of snapshots matches number of tables float numOfSnapshots = backupContext.getSnapshotNames().size(); http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 0a8d14d..8acace0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -31,7 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupCopyService; +import org.apache.hadoop.hbase.backup.BackupCopyTask; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; @@ -114,7 +114,7 @@ public class IncrementalTableBackupClient { String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]); strArr[strArr.length - 1] = backupContext.getHLogTargetDir(); - BackupCopyService copyService = BackupRestoreServerFactory.getBackupCopyService(conf); + BackupCopyTask copyService = BackupRestoreServerFactory.getBackupCopyService(conf); int counter = 0; int MAX_ITERAIONS = 2; while (counter++ < MAX_ITERAIONS) { http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java deleted file mode 100644 index a058ebc..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java +++ /dev/null @@ -1,349 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.mapreduce; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.math.BigDecimal; -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupCopyService; -import org.apache.hadoop.hbase.backup.BackupInfo; -import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.impl.BackupManager; -import org.apache.hadoop.hbase.backup.util.BackupServerUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.snapshot.ExportSnapshot; -import org.apache.hadoop.mapreduce.Cluster; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.tools.DistCp; -import org.apache.hadoop.tools.DistCpConstants; -import org.apache.hadoop.tools.DistCpOptions; -import org.apache.zookeeper.KeeperException.NoNodeException; -/** - * Copier for backup operation. Basically, there are 2 types of copy. One is copying from snapshot, - * which bases on extending ExportSnapshot's function with copy progress reporting to ZooKeeper - * implementation. The other is copying for incremental log files, which bases on extending - * DistCp's function with copy progress reporting to ZooKeeper implementation. - * - * For now this is only a wrapper. The other features such as progress and increment backup will be - * implemented in future jira - */ - -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class MapReduceBackupCopyService implements BackupCopyService { - private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyService.class); - - private Configuration conf; - // private static final long BYTES_PER_MAP = 2 * 256 * 1024 * 1024; - - // Accumulated progress within the whole backup process for the copy operation - private float progressDone = 0.1f; - private long bytesCopied = 0; - private static float INIT_PROGRESS = 0.1f; - - // The percentage of the current copy task within the whole task if multiple time copies are - // needed. The default value is 100%, which means only 1 copy task for the whole. - private float subTaskPercntgInWholeTask = 1f; - - public MapReduceBackupCopyService() { - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - /** - * Get the current copy task percentage within the whole task if multiple copies are needed. - * @return the current copy task percentage - */ - public float getSubTaskPercntgInWholeTask() { - return subTaskPercntgInWholeTask; - } - - /** - * Set the current copy task percentage within the whole task if multiple copies are needed. Must - * be called before calling - * {@link #copy(BackupHandler, Configuration, Type, String[])} - * @param subTaskPercntgInWholeTask The percentage of the copy subtask - */ - public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) { - this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask; - } - - class SnapshotCopy extends ExportSnapshot { - private BackupInfo backupContext; - private TableName table; - - public SnapshotCopy(BackupInfo backupContext, TableName table) { - super(); - this.backupContext = backupContext; - this.table = table; - } - - public TableName getTable() { - return this.table; - } - } - - /** - * Update the ongoing backup with new progress. - * @param backupContext backup context - * - * @param newProgress progress - * @param bytesCopied bytes copied - * @throws NoNodeException exception - */ - static void updateProgress(BackupInfo backupContext, BackupManager backupManager, - int newProgress, long bytesCopied) throws IOException { - // compose the new backup progress data, using fake number for now - String backupProgressData = newProgress + "%"; - - backupContext.setProgress(newProgress); - backupManager.updateBackupInfo(backupContext); - LOG.debug("Backup progress data \"" + backupProgressData - + "\" has been updated to hbase:backup for " + backupContext.getBackupId()); - } - - // Extends DistCp for progress updating to hbase:backup - // during backup. Using DistCpV2 (MAPREDUCE-2765). - // Simply extend it and override execute() method to get the - // Job reference for progress updating. - // Only the argument "src1, [src2, [...]] dst" is supported, - // no more DistCp options. - class BackupDistCp extends DistCp { - - private BackupInfo backupContext; - private BackupManager backupManager; - - public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupContext, - BackupManager backupManager) - throws Exception { - super(conf, options); - this.backupContext = backupContext; - this.backupManager = backupManager; - } - - @Override - public Job execute() throws Exception { - - // reflection preparation for private methods and fields - Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class; - Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath"); - Method methodCreateJob = classDistCp.getDeclaredMethod("createJob"); - Method methodCreateInputFileListing = - classDistCp.getDeclaredMethod("createInputFileListing", Job.class); - Method methodCleanup = classDistCp.getDeclaredMethod("cleanup"); - - Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions"); - Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder"); - Field fieldJobFS = classDistCp.getDeclaredField("jobFS"); - Field fieldSubmitted = classDistCp.getDeclaredField("submitted"); - - methodCreateMetaFolderPath.setAccessible(true); - methodCreateJob.setAccessible(true); - methodCreateInputFileListing.setAccessible(true); - methodCleanup.setAccessible(true); - - fieldInputOptions.setAccessible(true); - fieldMetaFolder.setAccessible(true); - fieldJobFS.setAccessible(true); - fieldSubmitted.setAccessible(true); - - // execute() logic starts here - assert fieldInputOptions.get(this) != null; - assert getConf() != null; - - Job job = null; - try { - synchronized (this) { - // Don't cleanup while we are setting up. - fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this)); - fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(getConf())); - job = (Job) methodCreateJob.invoke(this); - } - methodCreateInputFileListing.invoke(this, job); - - // Get the total length of the source files - List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths(); - - long totalSrcLgth = 0; - for (Path aSrc : srcs) { - totalSrcLgth += BackupServerUtil.getFilesLength(aSrc.getFileSystem(getConf()), aSrc); - } - - // submit the copy job - job.submit(); - fieldSubmitted.set(this, true); - - // after submit the MR job, set its handler in backup handler for cancel process - // this.backupHandler.copyJob = job; - - // Update the copy progress to ZK every 0.5s if progress value changed - int progressReportFreq = - this.getConf().getInt("hbase.backup.progressreport.frequency", 500); - float lastProgress = progressDone; - while (!job.isComplete()) { - float newProgress = - progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); - - if (newProgress > lastProgress) { - - BigDecimal progressData = - new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); - String newProgressStr = progressData + "%"; - LOG.info("Progress: " + newProgressStr); - updateProgress(backupContext, backupManager, progressData.intValue(), - bytesCopied); - LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr - + ".\""); - lastProgress = newProgress; - } - Thread.sleep(progressReportFreq); - } - // update the progress data after copy job complete - float newProgress = - progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); - BigDecimal progressData = - new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); - - String newProgressStr = progressData + "%"; - LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask + - " mapProgress: " + job.mapProgress()); - - // accumulate the overall backup progress - progressDone = newProgress; - bytesCopied += totalSrcLgth; - - updateProgress(backupContext, backupManager, progressData.intValue(), - bytesCopied); - LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr - + " - " + bytesCopied + " bytes copied.\""); - } catch (Throwable t) { - LOG.error("distcp " + job.getJobID() + " encountered error", t); - throw t; - } finally { - if (!fieldSubmitted.getBoolean(this)) { - methodCleanup.invoke(this); - } - } - - String jobID = job.getJobID().toString(); - job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); - - LOG.debug("DistCp job-id: " + jobID + " completed: " + job.isComplete() + " " + - job.isSuccessful()); - Counters ctrs = job.getCounters(); - LOG.debug(ctrs); - if (job.isComplete() && !job.isSuccessful()) { - throw new Exception("DistCp job-id: " + jobID + " failed"); - } - - return job; - } - - } - - - /** - * Do backup copy based on different types. - * @param context The backup context - * @param conf The hadoop configuration - * @param copyType The backup copy type - * @param options Options for customized ExportSnapshot or DistCp - * @throws Exception exception - */ - @Override - public int copy(BackupInfo context, BackupManager backupManager, Configuration conf, - BackupType copyType, String[] options) throws IOException { - int res = 0; - - try { - if (copyType == BackupType.FULL) { - SnapshotCopy snapshotCp = - new SnapshotCopy(context, context.getTableBySnapshot(options[1])); - LOG.debug("Doing SNAPSHOT_COPY"); - // Make a new instance of conf to be used by the snapshot copy class. - snapshotCp.setConf(new Configuration(conf)); - res = snapshotCp.run(options); - - } else if (copyType == BackupType.INCREMENTAL) { - LOG.debug("Doing COPY_TYPE_DISTCP"); - setSubTaskPercntgInWholeTask(1f); - - BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, context, - backupManager); - // Handle a special case where the source file is a single file. - // In this case, distcp will not create the target dir. It just take the - // target as a file name and copy source file to the target (as a file name). - // We need to create the target dir before run distcp. - LOG.debug("DistCp options: " + Arrays.toString(options)); - Path dest = new Path(options[options.length-1]); - FileSystem destfs = dest.getFileSystem(conf); - if (!destfs.exists(dest)) { - destfs.mkdirs(dest); - } - res = distcp.run(options); - } - return res; - - } catch (Exception e) { - throw new IOException(e); - } - } - - @Override - public void cancelCopyJob(String jobId) throws IOException { - JobID id = JobID.forName(jobId); - Cluster cluster = new Cluster(getConf()); - try { - Job job = cluster.getJob(id); - if (job == null) { - LOG.error("No job found for " + id); - // should we throw exception - } - if (job.isComplete() || job.isRetired()) { - return; - } - - job.killJob(); - LOG.debug("Killed job " + id); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyTask.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyTask.java new file mode 100644 index 0000000..cdde89e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyTask.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupCopyTask; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.util.BackupServerUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.snapshot.ExportSnapshot; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.tools.DistCp; +import org.apache.hadoop.tools.DistCpConstants; +import org.apache.hadoop.tools.DistCpOptions; +import org.apache.zookeeper.KeeperException.NoNodeException; +/** + * Copier for backup operation. Basically, there are 2 types of copy. One is copying from snapshot, + * which bases on extending ExportSnapshot's function with copy progress reporting to ZooKeeper + * implementation. The other is copying for incremental log files, which bases on extending + * DistCp's function with copy progress reporting to ZooKeeper implementation. + * + * For now this is only a wrapper. The other features such as progress and increment backup will be + * implemented in future jira + */ + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MapReduceBackupCopyTask implements BackupCopyTask { + private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyTask.class); + + private Configuration conf; + // private static final long BYTES_PER_MAP = 2 * 256 * 1024 * 1024; + + // Accumulated progress within the whole backup process for the copy operation + private float progressDone = 0.1f; + private long bytesCopied = 0; + private static float INIT_PROGRESS = 0.1f; + + // The percentage of the current copy task within the whole task if multiple time copies are + // needed. The default value is 100%, which means only 1 copy task for the whole. + private float subTaskPercntgInWholeTask = 1f; + + public MapReduceBackupCopyTask() { + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Get the current copy task percentage within the whole task if multiple copies are needed. + * @return the current copy task percentage + */ + public float getSubTaskPercntgInWholeTask() { + return subTaskPercntgInWholeTask; + } + + /** + * Set the current copy task percentage within the whole task if multiple copies are needed. Must + * be called before calling + * {@link #copy(BackupHandler, Configuration, Type, String[])} + * @param subTaskPercntgInWholeTask The percentage of the copy subtask + */ + public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) { + this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask; + } + + class SnapshotCopy extends ExportSnapshot { + private BackupInfo backupContext; + private TableName table; + + public SnapshotCopy(BackupInfo backupContext, TableName table) { + super(); + this.backupContext = backupContext; + this.table = table; + } + + public TableName getTable() { + return this.table; + } + } + + /** + * Update the ongoing backup with new progress. + * @param backupContext backup context + * + * @param newProgress progress + * @param bytesCopied bytes copied + * @throws NoNodeException exception + */ + static void updateProgress(BackupInfo backupContext, BackupManager backupManager, + int newProgress, long bytesCopied) throws IOException { + // compose the new backup progress data, using fake number for now + String backupProgressData = newProgress + "%"; + + backupContext.setProgress(newProgress); + backupManager.updateBackupInfo(backupContext); + LOG.debug("Backup progress data \"" + backupProgressData + + "\" has been updated to hbase:backup for " + backupContext.getBackupId()); + } + + // Extends DistCp for progress updating to hbase:backup + // during backup. Using DistCpV2 (MAPREDUCE-2765). + // Simply extend it and override execute() method to get the + // Job reference for progress updating. + // Only the argument "src1, [src2, [...]] dst" is supported, + // no more DistCp options. + class BackupDistCp extends DistCp { + + private BackupInfo backupContext; + private BackupManager backupManager; + + public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupContext, + BackupManager backupManager) + throws Exception { + super(conf, options); + this.backupContext = backupContext; + this.backupManager = backupManager; + } + + @Override + public Job execute() throws Exception { + + // reflection preparation for private methods and fields + Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class; + Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath"); + Method methodCreateJob = classDistCp.getDeclaredMethod("createJob"); + Method methodCreateInputFileListing = + classDistCp.getDeclaredMethod("createInputFileListing", Job.class); + Method methodCleanup = classDistCp.getDeclaredMethod("cleanup"); + + Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions"); + Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder"); + Field fieldJobFS = classDistCp.getDeclaredField("jobFS"); + Field fieldSubmitted = classDistCp.getDeclaredField("submitted"); + + methodCreateMetaFolderPath.setAccessible(true); + methodCreateJob.setAccessible(true); + methodCreateInputFileListing.setAccessible(true); + methodCleanup.setAccessible(true); + + fieldInputOptions.setAccessible(true); + fieldMetaFolder.setAccessible(true); + fieldJobFS.setAccessible(true); + fieldSubmitted.setAccessible(true); + + // execute() logic starts here + assert fieldInputOptions.get(this) != null; + assert getConf() != null; + + Job job = null; + try { + synchronized (this) { + // Don't cleanup while we are setting up. + fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this)); + fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(getConf())); + job = (Job) methodCreateJob.invoke(this); + } + methodCreateInputFileListing.invoke(this, job); + + // Get the total length of the source files + List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths(); + + long totalSrcLgth = 0; + for (Path aSrc : srcs) { + totalSrcLgth += BackupServerUtil.getFilesLength(aSrc.getFileSystem(getConf()), aSrc); + } + + // submit the copy job + job.submit(); + fieldSubmitted.set(this, true); + + // after submit the MR job, set its handler in backup handler for cancel process + // this.backupHandler.copyJob = job; + + // Update the copy progress to ZK every 0.5s if progress value changed + int progressReportFreq = + this.getConf().getInt("hbase.backup.progressreport.frequency", 500); + float lastProgress = progressDone; + while (!job.isComplete()) { + float newProgress = + progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); + + if (newProgress > lastProgress) { + + BigDecimal progressData = + new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); + String newProgressStr = progressData + "%"; + LOG.info("Progress: " + newProgressStr); + updateProgress(backupContext, backupManager, progressData.intValue(), + bytesCopied); + LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr + + ".\""); + lastProgress = newProgress; + } + Thread.sleep(progressReportFreq); + } + // update the progress data after copy job complete + float newProgress = + progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); + BigDecimal progressData = + new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); + + String newProgressStr = progressData + "%"; + LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask + + " mapProgress: " + job.mapProgress()); + + // accumulate the overall backup progress + progressDone = newProgress; + bytesCopied += totalSrcLgth; + + updateProgress(backupContext, backupManager, progressData.intValue(), + bytesCopied); + LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr + + " - " + bytesCopied + " bytes copied.\""); + } catch (Throwable t) { + LOG.error("distcp " + job.getJobID() + " encountered error", t); + throw t; + } finally { + if (!fieldSubmitted.getBoolean(this)) { + methodCleanup.invoke(this); + } + } + + String jobID = job.getJobID().toString(); + job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); + + LOG.debug("DistCp job-id: " + jobID + " completed: " + job.isComplete() + " " + + job.isSuccessful()); + Counters ctrs = job.getCounters(); + LOG.debug(ctrs); + if (job.isComplete() && !job.isSuccessful()) { + throw new Exception("DistCp job-id: " + jobID + " failed"); + } + + return job; + } + + } + + + /** + * Do backup copy based on different types. + * @param context The backup context + * @param conf The hadoop configuration + * @param copyType The backup copy type + * @param options Options for customized ExportSnapshot or DistCp + * @throws Exception exception + */ + @Override + public int copy(BackupInfo context, BackupManager backupManager, Configuration conf, + BackupType copyType, String[] options) throws IOException { + int res = 0; + + try { + if (copyType == BackupType.FULL) { + SnapshotCopy snapshotCp = + new SnapshotCopy(context, context.getTableBySnapshot(options[1])); + LOG.debug("Doing SNAPSHOT_COPY"); + // Make a new instance of conf to be used by the snapshot copy class. + snapshotCp.setConf(new Configuration(conf)); + res = snapshotCp.run(options); + + } else if (copyType == BackupType.INCREMENTAL) { + LOG.debug("Doing COPY_TYPE_DISTCP"); + setSubTaskPercntgInWholeTask(1f); + + BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, context, + backupManager); + // Handle a special case where the source file is a single file. + // In this case, distcp will not create the target dir. It just take the + // target as a file name and copy source file to the target (as a file name). + // We need to create the target dir before run distcp. + LOG.debug("DistCp options: " + Arrays.toString(options)); + Path dest = new Path(options[options.length-1]); + FileSystem destfs = dest.getFileSystem(conf); + if (!destfs.exists(dest)) { + destfs.mkdirs(dest); + } + res = distcp.run(options); + } + return res; + + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void cancelCopyJob(String jobId) throws IOException { + JobID id = JobID.forName(jobId); + Cluster cluster = new Cluster(getConf()); + try { + Job job = cluster.getJob(id); + if (job == null) { + LOG.error("No job found for " + id); + // should we throw exception + } + if (job.isComplete() || job.isRetired()) { + return; + } + + job.killJob(); + LOG.debug("Killed job " + id); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java deleted file mode 100644 index 18c1f86..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.mapreduce; - -import java.io.IOException; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.RestoreService; -import org.apache.hadoop.hbase.backup.util.BackupServerUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.mapreduce.WALPlayer; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.util.Tool; - -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class MapReduceRestoreService implements RestoreService { - public static final Log LOG = LogFactory.getLog(MapReduceRestoreService.class); - - private Tool player; - private Configuration conf; - - public MapReduceRestoreService() { - } - - @Override - public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames, - boolean fullBackupRestore) throws IOException { - - String bulkOutputConfKey; - - if (fullBackupRestore) { - player = new HFileSplitter(); - bulkOutputConfKey = HFileSplitter.BULK_OUTPUT_CONF_KEY; - } else { - player = new WALPlayer(); - bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY; - } - // Player reads all files in arbitrary directory structure and creates - // a Map task for each file - String dirs = StringUtils.join(dirPaths, ","); - - if (LOG.isDebugEnabled()) { - LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental") - + " backup from directory " + dirs + " from hbase tables " - + BackupServerUtil.join(tableNames) + " to tables " - + BackupServerUtil.join(newTableNames)); - } - - for (int i = 0; i < tableNames.length; i++) { - - LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]); - - Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i])); - Configuration conf = getConf(); - conf.set(bulkOutputConfKey, bulkOutputPath.toString()); - String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; - - int result = 0; - int loaderResult = 0; - try { - - player.setConf(getConf()); - result = player.run(playerArgs); - if (succeeded(result)) { - // do bulk load - LoadIncrementalHFiles loader = createLoader(); - if (LOG.isDebugEnabled()) { - LOG.debug("Restoring HFiles from directory " + bulkOutputPath); - } - String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() }; - loaderResult = loader.run(args); - - if (failed(loaderResult)) { - throw new IOException("Can not restore from backup directory " + dirs - + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult); - } - } else { - throw new IOException("Can not restore from backup directory " + dirs - + " (check Hadoop/MR and HBase logs). Player return code =" + result); - } - LOG.debug("Restore Job finished:" + result); - } catch (Exception e) { - throw new IOException("Can not restore from backup directory " + dirs - + " (check Hadoop and HBase logs) ", e); - } - - } - } - - private String getFileNameCompatibleString(TableName table) { - return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); - } - - private boolean failed(int result) { - return result != 0; - } - - private boolean succeeded(int result) { - return result == 0; - } - - private LoadIncrementalHFiles createLoader() throws IOException { - // set configuration for restore: - // LoadIncrementalHFile needs more time - // <name>hbase.rpc.timeout</name> <value>600000</value> - // calculates - Integer milliSecInHour = 3600000; - Configuration conf = new Configuration(getConf()); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour); - - // By default, it is 32 and loader will fail if # of files in any region exceed this - // limit. Bad for snapshot restore. - conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); - conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); - LoadIncrementalHFiles loader = null; - try { - loader = new LoadIncrementalHFiles(conf); - } catch (Exception e) { - throw new IOException(e); - } - return loader; - } - - private Path getBulkOutputDir(String tableName) throws IOException { - Configuration conf = getConf(); - FileSystem fs = FileSystem.get(conf); - String tmp = - conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - Path path = - new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" - + EnvironmentEdgeManager.currentTime()); - fs.deleteOnExit(path); - return path; - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java new file mode 100644 index 0000000..8d9f5b4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import java.io.IOException; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.RestoreTask; +import org.apache.hadoop.hbase.backup.util.BackupServerUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mapreduce.WALPlayer; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.Tool; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MapReduceRestoreTask implements RestoreTask { + public static final Log LOG = LogFactory.getLog(MapReduceRestoreTask.class); + + private Tool player; + private Configuration conf; + + public MapReduceRestoreTask() { + } + + @Override + public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames, + boolean fullBackupRestore) throws IOException { + + String bulkOutputConfKey; + + if (fullBackupRestore) { + player = new HFileSplitter(); + bulkOutputConfKey = HFileSplitter.BULK_OUTPUT_CONF_KEY; + } else { + player = new WALPlayer(); + bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY; + } + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file + String dirs = StringUtils.join(dirPaths, ","); + + if (LOG.isDebugEnabled()) { + LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental") + + " backup from directory " + dirs + " from hbase tables " + + BackupServerUtil.join(tableNames) + " to tables " + + BackupServerUtil.join(newTableNames)); + } + + for (int i = 0; i < tableNames.length; i++) { + + LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]); + + Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i])); + Configuration conf = getConf(); + conf.set(bulkOutputConfKey, bulkOutputPath.toString()); + String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; + + int result = 0; + int loaderResult = 0; + try { + + player.setConf(getConf()); + result = player.run(playerArgs); + if (succeeded(result)) { + // do bulk load + LoadIncrementalHFiles loader = createLoader(); + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring HFiles from directory " + bulkOutputPath); + } + String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() }; + loaderResult = loader.run(args); + + if (failed(loaderResult)) { + throw new IOException("Can not restore from backup directory " + dirs + + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult); + } + } else { + throw new IOException("Can not restore from backup directory " + dirs + + " (check Hadoop/MR and HBase logs). Player return code =" + result); + } + LOG.debug("Restore Job finished:" + result); + } catch (Exception e) { + throw new IOException("Can not restore from backup directory " + dirs + + " (check Hadoop and HBase logs) ", e); + } + + } + } + + private String getFileNameCompatibleString(TableName table) { + return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); + } + + private boolean failed(int result) { + return result != 0; + } + + private boolean succeeded(int result) { + return result == 0; + } + + private LoadIncrementalHFiles createLoader() throws IOException { + // set configuration for restore: + // LoadIncrementalHFile needs more time + // <name>hbase.rpc.timeout</name> <value>600000</value> + // calculates + Integer milliSecInHour = 3600000; + Configuration conf = new Configuration(getConf()); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour); + + // By default, it is 32 and loader will fail if # of files in any region exceed this + // limit. Bad for snapshot restore. + conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); + conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); + LoadIncrementalHFiles loader = null; + try { + loader = new LoadIncrementalHFiles(conf); + } catch (Exception e) { + throw new IOException(e); + } + return loader; + } + + private Path getBulkOutputDir(String tableName) throws IOException { + Configuration conf = getConf(); + FileSystem fs = FileSystem.get(conf); + String tmp = + conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + Path path = + new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" + + EnvironmentEdgeManager.currentTime()); + fs.deleteOnExit(path); + return path; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/914d162c/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java index cc2ecdf..c317844 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java @@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; -import org.apache.hadoop.hbase.backup.RestoreService; +import org.apache.hadoop.hbase.backup.RestoreTask; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -241,7 +241,7 @@ public class RestoreServerUtil { LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + newTableDescriptor); } } - RestoreService restoreService = + RestoreTask restoreService = BackupRestoreServerFactory.getRestoreService(conf); restoreService.run(logDirs, tableNames, newTableNames, false); @@ -478,7 +478,7 @@ public class RestoreServerUtil { // Run restore service Path[] dirs = new Path[regionPathList.size()]; regionPathList.toArray(dirs); - RestoreService restoreService = + RestoreTask restoreService = BackupRestoreServerFactory.getRestoreService(conf); restoreService.run(dirs, new TableName[] { tableName }, new TableName[] { newTableName },