This is an automated email from the ASF dual-hosted git repository. gjacoby pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 51715b3 PHOENIX-6273: Add support to handle MR Snapshot restore externally (#1110) 51715b3 is described below commit 51715b3eda37d0d204102e02dd943e93c0068e95 Author: Saksham Gangwar <sakshamgangwa...@gmail.com> AuthorDate: Mon Jan 25 13:25:53 2021 -0800 PHOENIX-6273: Add support to handle MR Snapshot restore externally (#1110) * PHOENIX-6273: Add support to handle MR Snapshot restore externally --- .../end2end/TableSnapshotReadsMapReduceIT.java | 60 ++++++++++++++++++++-- .../iterate/TableSnapshotResultIterator.java | 57 ++++++++++++++------ .../phoenix/mapreduce/PhoenixInputFormat.java | 6 +++ .../mapreduce/util/PhoenixConfigurationUtil.java | 19 +++++++ .../mapreduce/util/PhoenixMapReduceUtil.java | 4 +- 5 files changed, 122 insertions(+), 24 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java index db90014..c264792 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.end2end; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -30,6 +31,8 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Properties; @@ -38,12 +41,16 @@ import java.util.UUID; import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; @@ -61,16 +68,19 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PhoenixArray; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import com.google.common.collect.Maps; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { private static final Logger LOGGER = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class); @@ -101,6 +111,16 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { private Path tmpDir; private Configuration conf; private static final Random RANDOM = new Random(); + private Boolean isSnapshotRestoreDoneExternally; + + public TableSnapshotReadsMapReduceIT(Boolean isSnapshotRestoreDoneExternally) { + this.isSnapshotRestoreDoneExternally = isSnapshotRestoreDoneExternally; + } + + @Parameterized.Parameters + public static synchronized Collection<Boolean> snapshotRestoreDoneExternallyParams() { + return Arrays.asList(true, false); + } @BeforeClass public static synchronized void doSetup() throws Exception { @@ -166,6 +186,8 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { Assert.assertEquals("Correct snapshot name not found in configuration", SNAPSHOT_NAME, config.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY)); + TestingMapReduceParallelScanGrouper.clearNumCallsToGetRegionBoundaries(); + try (Connection conn = DriverManager.getConnection(getUrl())) { // create table tableName = generateUniqueName(); @@ -181,7 +203,6 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { config = job.getConfiguration(); Assert.assertNull("Snapshot name is not null in Configuration", config.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY)); - } private Job createAndTestJob(Connection conn) @@ -239,7 +260,7 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { private void configureJob(Job job, String tableName, String inputQuery, String condition, boolean shouldSplit) throws Exception { try { - upsertAndSnapshot(tableName, shouldSplit); + upsertAndSnapshot(tableName, shouldSplit, job.getConfiguration()); result = new ArrayList<>(); job.setMapperClass(TableSnapshotMapper.class); @@ -275,6 +296,7 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { } assertFalse("Should only have stored" + result.size() + "rows in the table for the timestamp!", rs.next()); + assertRestoreDirCount(conf, tmpDir.toString(), 1); } finally { deleteSnapshotIfExists(SNAPSHOT_NAME); } @@ -335,7 +357,7 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { stmt.execute(); } - private void upsertAndSnapshot(String tableName, boolean shouldSplit) throws Exception { + private void upsertAndSnapshot(String tableName, boolean shouldSplit, Configuration configuration) throws Exception { if (shouldSplit) { // having very few rows in table doesn't really help much with splitting case. // we should upsert large no of rows as a prerequisite to splitting @@ -365,6 +387,14 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); upsertData(stmt, "DDDD", "SNFB", 45); conn.commit(); + if (isSnapshotRestoreDoneExternally) { + //Performing snapshot restore which will be used during scans + Path rootDir = new Path(configuration.get(HConstants.HBASE_DIR)); + FileSystem fs = rootDir.getFileSystem(configuration); + Path restoreDir = new Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY)); + RestoreSnapshotHelper.copySnapshotForScanner(configuration, fs, rootDir, restoreDir, SNAPSHOT_NAME); + PhoenixConfigurationUtil.setMRSnapshotManagedExternally(configuration, true); + } } } @@ -459,6 +489,28 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { } } + /** + * Making sure that restore temp directory is not having multiple sub directories + * for same snapshot restore. + * @param conf + * @param restoreDir + * @param expectedCount + * @throws IOException + */ + private void assertRestoreDirCount(Configuration conf, String restoreDir, int expectedCount) + throws IOException { + FileSystem fs = FileSystem.get(conf); + FileStatus[] subDirectories = fs.listStatus(new Path(restoreDir)); + assertNotNull(subDirectories); + if (isSnapshotRestoreDoneExternally) { + //Snapshot Restore to be deleted externally by the caller + assertEquals(expectedCount, subDirectories.length); + } else { + //Snapshot Restore already deleted internally + assertEquals(0, subDirectories.length); + } + } + public static class TableSnapshotMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> { @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java index 9cca642..6efd928 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java @@ -21,15 +21,18 @@ package org.apache.phoenix.iterate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.phoenix.compile.ExplainPlanAttributes - .ExplainPlanAttributesBuilder; +import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.schema.tuple.Tuple; @@ -79,8 +82,12 @@ public class TableSnapshotResultIterator implements ResultIterator { this.scan = scan; this.scanMetricsHolder = scanMetricsHolder; this.scanIterator = UNINITIALIZED_SCANNER; - this.restoreDir = new Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY), - UUID.randomUUID().toString()); + if (PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration)) { + this.restoreDir = new Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY)); + } else { + this.restoreDir = new Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY), + UUID.randomUUID().toString()); + } this.snapshotName = configuration.get( PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY); this.rootDir = FSUtils.getRootDir(configuration); @@ -89,19 +96,33 @@ public class TableSnapshotResultIterator implements ResultIterator { } private void init() throws IOException { - RestoreSnapshotHelper.RestoreMetaChanges meta = - RestoreSnapshotHelper.copySnapshotForScanner(this.configuration, this.fs, - this.rootDir, this.restoreDir, this.snapshotName); - List<HRegionInfo> restoredRegions = meta.getRegionsToAdd(); - this.htd = meta.getTableDescriptor(); - this.regions = new ArrayList<>(restoredRegions.size()); - - for (HRegionInfo restoredRegion : restoredRegions) { - if (isValidRegion(restoredRegion)) { - this.regions.add(restoredRegion); + if (!PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration)) { + RestoreSnapshotHelper.RestoreMetaChanges meta = RestoreSnapshotHelper.copySnapshotForScanner(this.configuration, this.fs, this.rootDir, + this.restoreDir, this.snapshotName); + List<HRegionInfo> restoredRegions = meta.getRegionsToAdd(); + this.htd = meta.getTableDescriptor(); + this.regions = new ArrayList<>(restoredRegions.size()); + for (HRegionInfo restoredRegion : restoredRegions) { + if (isValidRegion(restoredRegion)) { + this.regions.add(restoredRegion); + } + } + } else { + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + HBaseProtos.SnapshotDescription snapshotDesc = + SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + SnapshotManifest manifest = + SnapshotManifest.open(configuration, fs, snapshotDir, snapshotDesc); + List<SnapshotProtos.SnapshotRegionManifest> regionManifests = manifest.getRegionManifests(); + this.regions = new ArrayList<>(regionManifests.size()); + this.htd = manifest.getTableDescriptor(); + for (SnapshotProtos.SnapshotRegionManifest srm : regionManifests) { + HRegionInfo hri = HRegionInfo.convert(srm.getRegionInfo()); + if (isValidRegion(hri)) { + regions.add(hri); + } } } - Collections.sort(this.regions); LOGGER.info("Initialization complete with " + regions.size() + " valid regions"); } @@ -165,7 +186,9 @@ public class TableSnapshotResultIterator implements ResultIterator { closed = true; // ok to say closed even if the below code throws an exception try { scanIterator.close(); - fs.delete(this.restoreDir, true); + if (!PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration)) { + fs.delete(this.restoreDir, true); + } } catch (IOException e) { throw ServerUtil.parseServerException(e); } finally { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java index 4711dbb..946ae4a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java @@ -210,12 +210,18 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr // setting the snapshot configuration String snapshotName = configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY); + String restoreDir = configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY); + boolean isSnapshotRestoreManagedExternally = PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration); Configuration config = queryPlan.getContext().getConnection().getQueryServices().getConfiguration(); if (snapshotName != null) { PhoenixConfigurationUtil.setSnapshotNameKey(config, snapshotName); + PhoenixConfigurationUtil.setRestoreDirKey(config, restoreDir); + PhoenixConfigurationUtil.setMRSnapshotManagedExternally(config, isSnapshotRestoreManagedExternally); } else { // making sure we unset snapshot name as new job doesn't need it config.unset(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY); + config.unset(PhoenixConfigurationUtil.RESTORE_DIR_KEY); + config.unset(PhoenixConfigurationUtil.MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE); } return queryPlan; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 1d8fa57..9dc31b4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -189,6 +189,12 @@ public final class PhoenixConfigurationUtil { // provide an absolute path to inject your multi input mapper logic public static final String MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ = "phoenix.mapreduce.multi.mapper.tracker.path"; + // provide control to whether or not handle mapreduce snapshot restore and cleanup operations which + // is used by scanners on phoenix side internally or handled by caller externally + public static final String MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE = "phoenix.mapreduce.external.snapshot.restore"; + + // by default MR snapshot restore is handled internally by phoenix + public static final boolean DEFAULT_MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE = false; /** * Determines type of Phoenix Map Reduce job. @@ -857,4 +863,17 @@ public final class PhoenixConfigurationUtil { configuration.set(MAPREDUCE_TENANT_ID, tenantId); } + public static void setMRSnapshotManagedExternally(Configuration configuration, Boolean isSnapshotRestoreManagedExternally) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(isSnapshotRestoreManagedExternally); + configuration.setBoolean(MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE, isSnapshotRestoreManagedExternally); + } + + public static boolean isMRSnapshotManagedExternally(final Configuration configuration) { + Preconditions.checkNotNull(configuration); + boolean isSnapshotRestoreManagedExternally = + configuration.getBoolean(MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE, DEFAULT_MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE); + return isSnapshotRestoreManagedExternally; + } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java index cab2361..368675d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java @@ -29,7 +29,6 @@ import org.apache.phoenix.mapreduce.PhoenixTTLTool; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; import java.io.IOException; -import java.util.UUID; /** * Utility class for setting Configuration parameters for the Map Reduce job @@ -181,8 +180,7 @@ public final class PhoenixMapReduceUtil { PhoenixConfigurationUtil.setInputClass(configuration, inputClass); PhoenixConfigurationUtil.setSnapshotNameKey(configuration, snapshotName); PhoenixConfigurationUtil.setInputTableName(configuration, tableName); - - PhoenixConfigurationUtil.setRestoreDirKey(configuration, new Path(restoreDir, UUID.randomUUID().toString()).toString()); + PhoenixConfigurationUtil.setRestoreDirKey(configuration, restoreDir.toString()); PhoenixConfigurationUtil.setSchemaType(configuration, schemaType); return configuration; }