This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.16
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.16 by this push:
     new c816f54  PHOENIX-6273: Add support to handle MR Snapshot restore 
externally (#1110)
c816f54 is described below

commit c816f54b89ee8f02b94ecc92ba0ee6163a26fb1d
Author: Saksham Gangwar <sakshamgangwa...@gmail.com>
AuthorDate: Mon Jan 25 13:25:53 2021 -0800

    PHOENIX-6273: Add support to handle MR Snapshot restore externally (#1110)
    
    * PHOENIX-6273: Add support to handle MR Snapshot restore externally
---
 .../end2end/TableSnapshotReadsMapReduceIT.java     | 60 ++++++++++++++++++++--
 .../iterate/TableSnapshotResultIterator.java       | 57 ++++++++++++++------
 .../phoenix/mapreduce/PhoenixInputFormat.java      |  6 +++
 .../mapreduce/util/PhoenixConfigurationUtil.java   | 19 +++++++
 .../mapreduce/util/PhoenixMapReduceUtil.java       |  4 +-
 5 files changed, 122 insertions(+), 24 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
index db90014..c264792 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.end2end;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -30,6 +31,8 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -38,12 +41,16 @@ import java.util.UUID;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
@@ -61,16 +68,19 @@ import 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PhoenixArray;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.google.common.collect.Maps;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@RunWith(Parameterized.class)
 public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT 
{
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
@@ -101,6 +111,16 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
   private Path tmpDir;
   private Configuration conf;
   private static final Random RANDOM = new Random();
+  private Boolean isSnapshotRestoreDoneExternally;
+
+  public TableSnapshotReadsMapReduceIT(Boolean 
isSnapshotRestoreDoneExternally) {
+    this.isSnapshotRestoreDoneExternally = isSnapshotRestoreDoneExternally;
+  }
+
+  @Parameterized.Parameters
+  public static synchronized Collection<Boolean> 
snapshotRestoreDoneExternallyParams() {
+    return Arrays.asList(true, false);
+  }
 
   @BeforeClass
   public static synchronized void doSetup() throws Exception {
@@ -166,6 +186,8 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
     Assert.assertEquals("Correct snapshot name not found in configuration", 
SNAPSHOT_NAME,
             config.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY));
 
+    TestingMapReduceParallelScanGrouper.clearNumCallsToGetRegionBoundaries();
+
     try (Connection conn = DriverManager.getConnection(getUrl())) {
       // create table
       tableName = generateUniqueName();
@@ -181,7 +203,6 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
     config = job.getConfiguration();
     Assert.assertNull("Snapshot name is not null in Configuration",
             config.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY));
-
   }
 
   private Job createAndTestJob(Connection conn)
@@ -239,7 +260,7 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
 
   private void configureJob(Job job, String tableName, String inputQuery, 
String condition, boolean shouldSplit) throws Exception {
     try {
-      upsertAndSnapshot(tableName, shouldSplit);
+      upsertAndSnapshot(tableName, shouldSplit, job.getConfiguration());
       result = new ArrayList<>();
 
       job.setMapperClass(TableSnapshotMapper.class);
@@ -275,6 +296,7 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
       }
 
       assertFalse("Should only have stored" + result.size() + "rows in the 
table for the timestamp!", rs.next());
+      assertRestoreDirCount(conf, tmpDir.toString(), 1);
     } finally {
       deleteSnapshotIfExists(SNAPSHOT_NAME);
     }
@@ -335,7 +357,7 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
     stmt.execute();
   }
 
-  private void upsertAndSnapshot(String tableName, boolean shouldSplit) throws 
Exception {
+  private void upsertAndSnapshot(String tableName, boolean shouldSplit, 
Configuration configuration) throws Exception {
     if (shouldSplit) {
       // having very few rows in table doesn't really help much with splitting 
case.
       // we should upsert large no of rows as a prerequisite to splitting
@@ -365,6 +387,14 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
       PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, 
tableName));
       upsertData(stmt, "DDDD", "SNFB", 45);
       conn.commit();
+      if (isSnapshotRestoreDoneExternally) {
+        //Performing snapshot restore which will be used during scans
+        Path rootDir = new Path(configuration.get(HConstants.HBASE_DIR));
+        FileSystem fs = rootDir.getFileSystem(configuration);
+        Path restoreDir = new 
Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY));
+        RestoreSnapshotHelper.copySnapshotForScanner(configuration, fs, 
rootDir, restoreDir, SNAPSHOT_NAME);
+        PhoenixConfigurationUtil.setMRSnapshotManagedExternally(configuration, 
true);
+      }
     }
   }
 
@@ -459,6 +489,28 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
     }
   }
 
+  /**
+   * Making sure that restore temp directory is not having multiple sub 
directories
+   * for same snapshot restore.
+   * @param conf
+   * @param restoreDir
+   * @param expectedCount
+   * @throws IOException
+   */
+  private void assertRestoreDirCount(Configuration conf, String restoreDir, 
int expectedCount)
+          throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] subDirectories = fs.listStatus(new Path(restoreDir));
+    assertNotNull(subDirectories);
+    if (isSnapshotRestoreDoneExternally) {
+      //Snapshot Restore to be deleted externally by the caller
+      assertEquals(expectedCount, subDirectories.length);
+    } else {
+      //Snapshot Restore already deleted internally
+      assertEquals(0, subDirectories.length);
+    }
+  }
+
   public static class TableSnapshotMapper extends Mapper<NullWritable, 
PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> {
 
     @Override
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
index 9cca642..6efd928 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
@@ -21,15 +21,18 @@ package org.apache.phoenix.iterate;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos;
 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.phoenix.compile.ExplainPlanAttributes
-    .ExplainPlanAttributesBuilder;
+import 
org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -79,8 +82,12 @@ public class TableSnapshotResultIterator implements 
ResultIterator {
     this.scan = scan;
     this.scanMetricsHolder = scanMetricsHolder;
     this.scanIterator = UNINITIALIZED_SCANNER;
-    this.restoreDir = new 
Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY),
-        UUID.randomUUID().toString());
+    if (PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration)) 
{
+      this.restoreDir = new 
Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY));
+    } else {
+      this.restoreDir = new 
Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY),
+          UUID.randomUUID().toString());
+    }
     this.snapshotName = configuration.get(
         PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
     this.rootDir = FSUtils.getRootDir(configuration);
@@ -89,19 +96,33 @@ public class TableSnapshotResultIterator implements 
ResultIterator {
   }
 
   private void init() throws IOException {
-    RestoreSnapshotHelper.RestoreMetaChanges meta =
-        RestoreSnapshotHelper.copySnapshotForScanner(this.configuration, 
this.fs,
-            this.rootDir, this.restoreDir, this.snapshotName);
-    List<HRegionInfo> restoredRegions = meta.getRegionsToAdd();
-    this.htd = meta.getTableDescriptor();
-    this.regions = new ArrayList<>(restoredRegions.size());
-
-    for (HRegionInfo restoredRegion : restoredRegions) {
-      if (isValidRegion(restoredRegion)) {
-        this.regions.add(restoredRegion);
+    if 
(!PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration)) {
+      RestoreSnapshotHelper.RestoreMetaChanges meta = 
RestoreSnapshotHelper.copySnapshotForScanner(this.configuration, this.fs, 
this.rootDir,
+          this.restoreDir, this.snapshotName);
+      List<HRegionInfo> restoredRegions = meta.getRegionsToAdd();
+      this.htd = meta.getTableDescriptor();
+      this.regions = new ArrayList<>(restoredRegions.size());
+      for (HRegionInfo restoredRegion : restoredRegions) {
+        if (isValidRegion(restoredRegion)) {
+          this.regions.add(restoredRegion);
+        }
+      }
+    } else {
+      Path snapshotDir = 
SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+      HBaseProtos.SnapshotDescription snapshotDesc =
+          SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+      SnapshotManifest manifest =
+          SnapshotManifest.open(configuration, fs, snapshotDir, snapshotDesc);
+      List<SnapshotProtos.SnapshotRegionManifest> regionManifests = 
manifest.getRegionManifests();
+      this.regions = new ArrayList<>(regionManifests.size());
+      this.htd = manifest.getTableDescriptor();
+      for (SnapshotProtos.SnapshotRegionManifest srm : regionManifests) {
+        HRegionInfo hri = HRegionInfo.convert(srm.getRegionInfo());
+        if (isValidRegion(hri)) {
+          regions.add(hri);
+        }
       }
     }
-
     Collections.sort(this.regions);
     LOGGER.info("Initialization complete with " + regions.size() + " valid 
regions");
   }
@@ -165,7 +186,9 @@ public class TableSnapshotResultIterator implements 
ResultIterator {
     closed = true; // ok to say closed even if the below code throws an 
exception
     try {
       scanIterator.close();
-      fs.delete(this.restoreDir, true);
+      if 
(!PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration)) {
+        fs.delete(this.restoreDir, true);
+      }
     } catch (IOException e) {
       throw ServerUtil.parseServerException(e);
     } finally {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 4711dbb..946ae4a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -210,12 +210,18 @@ public class PhoenixInputFormat<T extends DBWritable> 
extends InputFormat<NullWr
 
               // setting the snapshot configuration
               String snapshotName = 
configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+              String restoreDir = 
configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY);
+              boolean isSnapshotRestoreManagedExternally = 
PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration);
               Configuration config = 
queryPlan.getContext().getConnection().getQueryServices().getConfiguration();
               if (snapshotName != null) {
                   PhoenixConfigurationUtil.setSnapshotNameKey(config, 
snapshotName);
+                  PhoenixConfigurationUtil.setRestoreDirKey(config, 
restoreDir);
+                  
PhoenixConfigurationUtil.setMRSnapshotManagedExternally(config, 
isSnapshotRestoreManagedExternally);
               } else {
                   // making sure we unset snapshot name as new job doesn't 
need it
                   config.unset(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+                  config.unset(PhoenixConfigurationUtil.RESTORE_DIR_KEY);
+                  
config.unset(PhoenixConfigurationUtil.MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE);
               }
 
               return queryPlan;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 1d8fa57..9dc31b4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -189,6 +189,12 @@ public final class PhoenixConfigurationUtil {
     // provide an absolute path to inject your multi input mapper logic
     public static final String MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ = 
"phoenix.mapreduce.multi.mapper.tracker.path";
 
+    // provide control to whether or not handle mapreduce snapshot restore and 
cleanup operations which
+    // is used by scanners on phoenix side internally or handled by caller 
externally
+    public static final String MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE = 
"phoenix.mapreduce.external.snapshot.restore";
+
+    // by default MR snapshot restore is handled internally by phoenix
+    public static final boolean DEFAULT_MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE = 
false;
 
     /**
      * Determines type of Phoenix Map Reduce job.
@@ -857,4 +863,17 @@ public final class PhoenixConfigurationUtil {
         configuration.set(MAPREDUCE_TENANT_ID, tenantId);
     }
 
+    public static void setMRSnapshotManagedExternally(Configuration 
configuration, Boolean isSnapshotRestoreManagedExternally) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(isSnapshotRestoreManagedExternally);
+        configuration.setBoolean(MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE, 
isSnapshotRestoreManagedExternally);
+    }
+
+    public static boolean isMRSnapshotManagedExternally(final Configuration 
configuration) {
+        Preconditions.checkNotNull(configuration);
+        boolean isSnapshotRestoreManagedExternally =
+            configuration.getBoolean(MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE, 
DEFAULT_MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE);
+        return isSnapshotRestoreManagedExternally;
+    }
+
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index cab2361..368675d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -29,7 +29,6 @@ import org.apache.phoenix.mapreduce.PhoenixTTLTool;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
 
 import java.io.IOException;
-import java.util.UUID;
 
 /**
  * Utility class for setting Configuration parameters for the Map Reduce job
@@ -181,8 +180,7 @@ public final class PhoenixMapReduceUtil {
         PhoenixConfigurationUtil.setInputClass(configuration, inputClass);
         PhoenixConfigurationUtil.setSnapshotNameKey(configuration, 
snapshotName);
         PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
-
-        PhoenixConfigurationUtil.setRestoreDirKey(configuration, new 
Path(restoreDir, UUID.randomUUID().toString()).toString());
+        PhoenixConfigurationUtil.setRestoreDirKey(configuration, 
restoreDir.toString());
         PhoenixConfigurationUtil.setSchemaType(configuration, schemaType);
         return configuration;
     }

Reply via email to