gvprathyusha6 commented on code in PR #8248:
URL: https://github.com/apache/hbase/pull/8248#discussion_r3475934856


##########
hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapreduceHFileArchiver.java:
##########
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.backup.FailedArchiveException;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotArchiver;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * MapReduce-local archiver used by {@link
+ * org.apache.hadoop.hbase.mapreduce.MapreduceRestoreSnapshotHelper}. It 
mirrors the server-side
+ * {@code HFileArchiver} but lives in the MapReduce module so the 
snapshot-scanning path can be
+ * evolved independently. It is injected into the shared {@link
+ * org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper} restore/clone logic 
via {@link
+ * RestoreSnapshotArchiver}.
+ */
[email protected]
+public class MapreduceHFileArchiver implements RestoreSnapshotArchiver {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MapreduceHFileArchiver.class);
+  private static final String SEPARATOR = ".";
+
+  /** Number of retries in case of fs operation failure */
+  private static final int DEFAULT_RETRIES_NUMBER = 3;
+
+  private static final Function<File, Path> FUNC_FILE_TO_PATH = new 
Function<File, Path>() {
+    @Override
+    public Path apply(File file) {
+      return file == null ? null : file.getPath();
+    }
+  };
+
+  /** Returns True if the Region exits in the filesystem. */
+  public static boolean exists(Configuration conf, FileSystem fs, RegionInfo 
info)
+    throws IOException {
+    Path rootDir = CommonFSUtils.getRootDir(conf);
+    Path regionDir = FSUtils.getRegionDirFromRootDir(rootDir, info);
+    return fs.exists(regionDir);
+  }
+
+  /**
+   * Cleans up all the files for a HRegion by archiving the HFiles to the 
archive directory
+   * @param conf     the configuration to use
+   * @param fs       the file system object
+   * @param info     RegionInfo for region to be deleted
+   * @param rootDir  {@link Path} to the root directory where hbase files are 
stored (for building
+   *                 the archive path)
+   * @param tableDir {@link Path} to where the table is being stored (for 
building the archive path)
+   */
+  @Override
+  public void archiveRegion(Configuration conf, FileSystem fs, RegionInfo 
info, Path rootDir,
+    Path tableDir) throws IOException {
+    archiveRegion(conf, fs, rootDir, tableDir, 
FSUtils.getRegionDirFromRootDir(rootDir, info));
+  }
+
+  /**
+   * Remove an entire region from the table directory via archiving the 
region's hfiles.
+   * @param fs        {@link FileSystem} from which to remove the region
+   * @param rootdir   {@link Path} to the root directory where hbase files are 
stored (for building
+   *                  the archive path)
+   * @param tableDir  {@link Path} to where the table is being stored (for 
building the archive
+   *                  path)
+   * @param regionDir {@link Path} to where a region is being stored (for 
building the archive path)
+   * @return <tt>true</tt> if the region was successfully deleted. 
<tt>false</tt> if the filesystem
+   *         operations could not complete.
+   * @throws IOException if the request cannot be completed
+   */
+  public static boolean archiveRegion(Configuration conf, FileSystem fs, Path 
rootdir,
+    Path tableDir, Path regionDir) throws IOException {
+    // otherwise, we archive the files
+    // make sure we can archive
+    if (tableDir == null || regionDir == null) {
+      LOG.error("No archive directory could be found because tabledir (" + 
tableDir
+        + ") or regiondir (" + regionDir + "was null. Deleting files 
instead.");
+      if (regionDir != null) {
+        deleteRegionWithoutArchiving(fs, regionDir);
+      }
+      // we should have archived, but failed to. Doesn't matter if we deleted
+      // the archived files correctly or not.
+      return false;
+    }
+
+    LOG.debug("ARCHIVING {}", regionDir);
+
+    // make sure the regiondir lives under the tabledir
+    
Preconditions.checkArgument(regionDir.toString().startsWith(tableDir.toString()));
+    Path regionArchiveDir = HFileArchiveUtil.getRegionArchiveDir(rootdir,
+      CommonFSUtils.getTableName(tableDir), regionDir.getName());
+
+    FileStatusConverter getAsFile = new FileStatusConverter(fs);
+    // otherwise, we attempt to archive the store files
+
+    // build collection of just the store directories to archive
+    Collection<File> toArchive = new ArrayList<>();
+    final PathFilter dirFilter = new FSUtils.DirFilter(fs);
+    PathFilter nonHidden = new PathFilter() {
+      @Override
+      public boolean accept(Path file) {
+        return dirFilter.accept(file) && !file.getName().startsWith(".");
+      }
+    };
+    FileStatus[] storeDirs = CommonFSUtils.listStatus(fs, regionDir, 
nonHidden);
+    // if there no files, we can just delete the directory and return;
+    if (storeDirs == null) {
+      LOG.debug("Directory {} empty.", regionDir);
+      return deleteRegionWithoutArchiving(fs, regionDir);
+    }
+
+    // convert the files in the region to a File
+    Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add);
+    LOG.debug("Archiving " + toArchive);
+    List<File> failedArchive = resolveAndArchive(conf, fs, regionArchiveDir, 
toArchive,
+      EnvironmentEdgeManager.currentTime());
+    if (!failedArchive.isEmpty()) {
+      throw new FailedArchiveException(
+        "Failed to archive/delete all the files for region:" + 
regionDir.getName() + " into "
+          + regionArchiveDir + ". Something is probably awry on the 
filesystem.",
+        
failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList()));
+    }
+    // if that was successful, then we delete the region
+    return deleteRegionWithoutArchiving(fs, regionDir);
+  }
+
+  // We need this method instead of Threads.getNamedThreadFactory() to pass 
some tests.
+  // The difference from Threads.getNamedThreadFactory() is that it doesn't 
fix ThreadGroup for
+  // new threads. If we use Threads.getNamedThreadFactory(), we will face 
ThreadGroup related
+  // issues in some tests.
+  private static ThreadFactory getThreadFactory(String archiverName) {
+    return new ThreadFactory() {
+      final AtomicInteger threadNumber = new AtomicInteger(1);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        final String name = archiverName + "-" + 
threadNumber.getAndIncrement();
+        Thread t = new Thread(r, name);
+        t.setDaemon(true);
+        return t;
+      }
+    };
+  }
+
+  /**
+   * Removes from the specified region the store files of the specified column 
family, either by
+   * archiving them or outright deletion
+   * @param fs        the filesystem where the store files live
+   * @param conf      {@link Configuration} to examine to determine the 
archive directory
+   * @param parent    Parent region hosting the store files
+   * @param familyDir {@link Path} to where the family is being stored
+   * @param family    the family hosting the store files
+   * @throws IOException if the files could not be correctly disposed.
+   */
+  @Override
+  public void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, 
RegionInfo parent,
+    Path familyDir, byte[] family) throws IOException {
+    FileStatus[] storeFiles = CommonFSUtils.listStatus(fs, familyDir);
+    if (storeFiles == null) {
+      LOG.debug("No files to dispose of in {}, family={}", 
parent.getRegionNameAsString(),
+        Bytes.toString(family));
+      return;
+    }
+
+    FileStatusConverter getAsFile = new FileStatusConverter(fs);
+    Collection<File> toArchive = 
Stream.of(storeFiles).map(getAsFile).collect(Collectors.toList());
+    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, 
family);
+
+    // do the actual archive
+    List<File> failedArchive =
+      resolveAndArchive(conf, fs, storeArchiveDir, toArchive, 
EnvironmentEdgeManager.currentTime());
+    if (!failedArchive.isEmpty()) {
+      throw new FailedArchiveException(
+        "Failed to archive/delete all the files for region:"
+          + Bytes.toString(parent.getRegionName()) + ", family:" + 
Bytes.toString(family) + " into "
+          + storeArchiveDir + ". Something is probably awry on the 
filesystem.",
+        
failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList()));
+    }
+  }
+
+  /**
+   * Resolve any conflict with an existing archive file via timestamp-append 
renaming of the
+   * existing file and then archive the passed in files.
+   * @param fs             {@link FileSystem} on which to archive the files
+   * @param baseArchiveDir base archive directory to store the files. If any 
of the files to archive
+   *                       are directories, will append the name of the 
directory to the base
+   *                       archive directory name, creating a parallel 
structure.
+   * @param toArchive      files/directories that need to be archvied
+   * @param start          time the archiving started - used for resolving 
archive conflicts.
+   * @return the list of failed to archive files.
+   * @throws IOException if an unexpected file operation exception occurred
+   */
+  private static List<File> resolveAndArchive(Configuration conf, FileSystem 
fs,
+    Path baseArchiveDir, Collection<File> toArchive, long start) throws 
IOException {
+    // Early exit if no files to archive
+    if (toArchive.isEmpty()) {
+      LOG.trace("No files to archive, returning an empty list.");
+      return Collections.emptyList();
+    }
+
+    LOG.trace("Preparing to archive files into directory: {}", baseArchiveDir);
+
+    // Ensure the archive directory exists
+    ensureArchiveDirectoryExists(fs, baseArchiveDir);
+
+    // Thread-safe collection for storing failures
+    Queue<File> failures = new ConcurrentLinkedQueue<>();
+    String startTime = Long.toString(start);
+
+    // Separate files and directories for processing
+    List<File> filesOnly = new ArrayList<>();
+    for (File file : toArchive) {
+      if (file.isFile()) {
+        filesOnly.add(file);
+      } else {
+        handleDirectory(conf, fs, baseArchiveDir, failures, file, start);
+      }
+    }
+
+    // Archive files concurrently
+    archiveFilesConcurrently(conf, baseArchiveDir, filesOnly, failures, 
startTime);
+
+    return new ArrayList<>(failures); // Convert to a List for the return value
+  }
+
+  private static void ensureArchiveDirectoryExists(FileSystem fs, Path 
baseArchiveDir)
+    throws IOException {
+    if (!fs.exists(baseArchiveDir) && !fs.mkdirs(baseArchiveDir)) {
+      throw new IOException("Failed to create the archive directory: " + 
baseArchiveDir);
+    }
+    LOG.trace("Archive directory ready: {}", baseArchiveDir);
+  }
+
+  private static void handleDirectory(Configuration conf, FileSystem fs, Path 
baseArchiveDir,

Review Comment:
   Are all of these methods are triggered as part of  
RestoreSnapshotHelper.copySnapshotForScanner?? Can you check that please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to