gvprathyusha6 commented on code in PR #8248: URL: https://github.com/apache/hbase/pull/8248#discussion_r3475934856
########## hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapreduceHFileArchiver.java: ########## @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.backup.FailedArchiveException; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotArchiver; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +/** + * MapReduce-local archiver used by {@link + * org.apache.hadoop.hbase.mapreduce.MapreduceRestoreSnapshotHelper}. It mirrors the server-side + * {@code HFileArchiver} but lives in the MapReduce module so the snapshot-scanning path can be + * evolved independently. It is injected into the shared {@link + * org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper} restore/clone logic via {@link + * RestoreSnapshotArchiver}. + */ [email protected] +public class MapreduceHFileArchiver implements RestoreSnapshotArchiver { + + private static final Logger LOG = LoggerFactory.getLogger(MapreduceHFileArchiver.class); + private static final String SEPARATOR = "."; + + /** Number of retries in case of fs operation failure */ + private static final int DEFAULT_RETRIES_NUMBER = 3; + + private static final Function<File, Path> FUNC_FILE_TO_PATH = new Function<File, Path>() { + @Override + public Path apply(File file) { + return file == null ? null : file.getPath(); + } + }; + + /** Returns True if the Region exits in the filesystem. */ + public static boolean exists(Configuration conf, FileSystem fs, RegionInfo info) + throws IOException { + Path rootDir = CommonFSUtils.getRootDir(conf); + Path regionDir = FSUtils.getRegionDirFromRootDir(rootDir, info); + return fs.exists(regionDir); + } + + /** + * Cleans up all the files for a HRegion by archiving the HFiles to the archive directory + * @param conf the configuration to use + * @param fs the file system object + * @param info RegionInfo for region to be deleted + * @param rootDir {@link Path} to the root directory where hbase files are stored (for building + * the archive path) + * @param tableDir {@link Path} to where the table is being stored (for building the archive path) + */ + @Override + public void archiveRegion(Configuration conf, FileSystem fs, RegionInfo info, Path rootDir, + Path tableDir) throws IOException { + archiveRegion(conf, fs, rootDir, tableDir, FSUtils.getRegionDirFromRootDir(rootDir, info)); + } + + /** + * Remove an entire region from the table directory via archiving the region's hfiles. + * @param fs {@link FileSystem} from which to remove the region + * @param rootdir {@link Path} to the root directory where hbase files are stored (for building + * the archive path) + * @param tableDir {@link Path} to where the table is being stored (for building the archive + * path) + * @param regionDir {@link Path} to where a region is being stored (for building the archive path) + * @return <tt>true</tt> if the region was successfully deleted. <tt>false</tt> if the filesystem + * operations could not complete. + * @throws IOException if the request cannot be completed + */ + public static boolean archiveRegion(Configuration conf, FileSystem fs, Path rootdir, + Path tableDir, Path regionDir) throws IOException { + // otherwise, we archive the files + // make sure we can archive + if (tableDir == null || regionDir == null) { + LOG.error("No archive directory could be found because tabledir (" + tableDir + + ") or regiondir (" + regionDir + "was null. Deleting files instead."); + if (regionDir != null) { + deleteRegionWithoutArchiving(fs, regionDir); + } + // we should have archived, but failed to. Doesn't matter if we deleted + // the archived files correctly or not. + return false; + } + + LOG.debug("ARCHIVING {}", regionDir); + + // make sure the regiondir lives under the tabledir + Preconditions.checkArgument(regionDir.toString().startsWith(tableDir.toString())); + Path regionArchiveDir = HFileArchiveUtil.getRegionArchiveDir(rootdir, + CommonFSUtils.getTableName(tableDir), regionDir.getName()); + + FileStatusConverter getAsFile = new FileStatusConverter(fs); + // otherwise, we attempt to archive the store files + + // build collection of just the store directories to archive + Collection<File> toArchive = new ArrayList<>(); + final PathFilter dirFilter = new FSUtils.DirFilter(fs); + PathFilter nonHidden = new PathFilter() { + @Override + public boolean accept(Path file) { + return dirFilter.accept(file) && !file.getName().startsWith("."); + } + }; + FileStatus[] storeDirs = CommonFSUtils.listStatus(fs, regionDir, nonHidden); + // if there no files, we can just delete the directory and return; + if (storeDirs == null) { + LOG.debug("Directory {} empty.", regionDir); + return deleteRegionWithoutArchiving(fs, regionDir); + } + + // convert the files in the region to a File + Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add); + LOG.debug("Archiving " + toArchive); + List<File> failedArchive = resolveAndArchive(conf, fs, regionArchiveDir, toArchive, + EnvironmentEdgeManager.currentTime()); + if (!failedArchive.isEmpty()) { + throw new FailedArchiveException( + "Failed to archive/delete all the files for region:" + regionDir.getName() + " into " + + regionArchiveDir + ". Something is probably awry on the filesystem.", + failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList())); + } + // if that was successful, then we delete the region + return deleteRegionWithoutArchiving(fs, regionDir); + } + + // We need this method instead of Threads.getNamedThreadFactory() to pass some tests. + // The difference from Threads.getNamedThreadFactory() is that it doesn't fix ThreadGroup for + // new threads. If we use Threads.getNamedThreadFactory(), we will face ThreadGroup related + // issues in some tests. + private static ThreadFactory getThreadFactory(String archiverName) { + return new ThreadFactory() { + final AtomicInteger threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + final String name = archiverName + "-" + threadNumber.getAndIncrement(); + Thread t = new Thread(r, name); + t.setDaemon(true); + return t; + } + }; + } + + /** + * Removes from the specified region the store files of the specified column family, either by + * archiving them or outright deletion + * @param fs the filesystem where the store files live + * @param conf {@link Configuration} to examine to determine the archive directory + * @param parent Parent region hosting the store files + * @param familyDir {@link Path} to where the family is being stored + * @param family the family hosting the store files + * @throws IOException if the files could not be correctly disposed. + */ + @Override + public void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, RegionInfo parent, + Path familyDir, byte[] family) throws IOException { + FileStatus[] storeFiles = CommonFSUtils.listStatus(fs, familyDir); + if (storeFiles == null) { + LOG.debug("No files to dispose of in {}, family={}", parent.getRegionNameAsString(), + Bytes.toString(family)); + return; + } + + FileStatusConverter getAsFile = new FileStatusConverter(fs); + Collection<File> toArchive = Stream.of(storeFiles).map(getAsFile).collect(Collectors.toList()); + Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, family); + + // do the actual archive + List<File> failedArchive = + resolveAndArchive(conf, fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime()); + if (!failedArchive.isEmpty()) { + throw new FailedArchiveException( + "Failed to archive/delete all the files for region:" + + Bytes.toString(parent.getRegionName()) + ", family:" + Bytes.toString(family) + " into " + + storeArchiveDir + ". Something is probably awry on the filesystem.", + failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList())); + } + } + + /** + * Resolve any conflict with an existing archive file via timestamp-append renaming of the + * existing file and then archive the passed in files. + * @param fs {@link FileSystem} on which to archive the files + * @param baseArchiveDir base archive directory to store the files. If any of the files to archive + * are directories, will append the name of the directory to the base + * archive directory name, creating a parallel structure. + * @param toArchive files/directories that need to be archvied + * @param start time the archiving started - used for resolving archive conflicts. + * @return the list of failed to archive files. + * @throws IOException if an unexpected file operation exception occurred + */ + private static List<File> resolveAndArchive(Configuration conf, FileSystem fs, + Path baseArchiveDir, Collection<File> toArchive, long start) throws IOException { + // Early exit if no files to archive + if (toArchive.isEmpty()) { + LOG.trace("No files to archive, returning an empty list."); + return Collections.emptyList(); + } + + LOG.trace("Preparing to archive files into directory: {}", baseArchiveDir); + + // Ensure the archive directory exists + ensureArchiveDirectoryExists(fs, baseArchiveDir); + + // Thread-safe collection for storing failures + Queue<File> failures = new ConcurrentLinkedQueue<>(); + String startTime = Long.toString(start); + + // Separate files and directories for processing + List<File> filesOnly = new ArrayList<>(); + for (File file : toArchive) { + if (file.isFile()) { + filesOnly.add(file); + } else { + handleDirectory(conf, fs, baseArchiveDir, failures, file, start); + } + } + + // Archive files concurrently + archiveFilesConcurrently(conf, baseArchiveDir, filesOnly, failures, startTime); + + return new ArrayList<>(failures); // Convert to a List for the return value + } + + private static void ensureArchiveDirectoryExists(FileSystem fs, Path baseArchiveDir) + throws IOException { + if (!fs.exists(baseArchiveDir) && !fs.mkdirs(baseArchiveDir)) { + throw new IOException("Failed to create the archive directory: " + baseArchiveDir); + } + LOG.trace("Archive directory ready: {}", baseArchiveDir); + } + + private static void handleDirectory(Configuration conf, FileSystem fs, Path baseArchiveDir, Review Comment: Are all of these methods are triggered as part of RestoreSnapshotHelper.copySnapshotForScanner?? Can you check that please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
