This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 07af4bf PHOENIX-6274 : Deflake TableSnapshotReadsMapReduceIT.testMapReduceSnapshotsMultiRegion 07af4bf is described below commit 07af4bf4e6df9dc92b4f942b492b835e3a1a9d61 Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Sat Dec 19 17:34:26 2020 +0530 PHOENIX-6274 : Deflake TableSnapshotReadsMapReduceIT.testMapReduceSnapshotsMultiRegion --- .../end2end/TableSnapshotReadsMapReduceIT.java | 144 ++++++++++++++++++--- 1 file changed, 123 insertions(+), 21 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java index 930ff0b..db90014 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java @@ -33,14 +33,18 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.UUID; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.io.NullWritable; @@ -79,9 +83,9 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { private static final String FIELD1 = "FIELD1"; private static final String FIELD2 = "FIELD2"; private static final String FIELD3 = "FIELD3"; - private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " + + private static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " + " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))"; - private String UPSERT = "UPSERT into %s values (?, ?, ?)"; + private static final String UPSERT = "UPSERT into %s values (?, ?, ?)"; private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " + STOCK_NAME + " VARCHAR NOT NULL , " + RECORDING_YEAR + " INTEGER NOT NULL, " + RECORDINGS_QUARTER + " " @@ -96,11 +100,13 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { private Job job; private Path tmpDir; private Configuration conf; + private static final Random RANDOM = new Random(); @BeforeClass public static synchronized void doSetup() throws Exception { Map<String,String> props = Maps.newHashMapWithExpectedSize(1); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + getUtility().getHBaseAdmin().setBalancerRunning(false, true); } @Before @@ -270,7 +276,7 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { assertFalse("Should only have stored" + result.size() + "rows in the table for the timestamp!", rs.next()); } finally { - deleteSnapshot(tableName); + deleteSnapshotIfExists(SNAPSHOT_NAME); } } @@ -302,6 +308,26 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { conn.commit(); } + private void upsertDataBeforeSplit(String tableName) throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); + upsertData(stmt, "CCCC", "SSDD", RANDOM.nextInt()); + for (int i = 0; i < 100; i++) { + upsertData(stmt, "AAAA" + i, "JHHA" + i, RANDOM.nextInt()); + upsertData(stmt, "0000" + i, "JHHB" + i, RANDOM.nextInt()); + upsertData(stmt, "9999" + i, "JHHC" + i, RANDOM.nextInt()); + upsertData(stmt, "BBBB" + i, "JSHJ" + i, RANDOM.nextInt()); + upsertData(stmt, "BBBB1" + i, "JSHK" + i, RANDOM.nextInt()); + upsertData(stmt, "BBBB2" + i, "JSHL" + i, RANDOM.nextInt()); + upsertData(stmt, "CCCC1" + i, "SSDE" + i, RANDOM.nextInt()); + upsertData(stmt, "CCCC2" + i, "SSDF" + i, RANDOM.nextInt()); + upsertData(stmt, "PPPP" + i, "AJDH" + i, RANDOM.nextInt()); + upsertData(stmt, "SSSS" + i, "HSDG" + i, RANDOM.nextInt()); + upsertData(stmt, "XXXX" + i, "HDPP" + i, RANDOM.nextInt()); + } + conn.commit(); + } + private void upsertData(PreparedStatement stmt, String field1, String field2, int field3) throws SQLException { stmt.setString(1, field1); stmt.setString(2, field2); @@ -310,17 +336,23 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { } private void upsertAndSnapshot(String tableName, boolean shouldSplit) throws Exception { - upsertData(tableName); + if (shouldSplit) { + // having very few rows in table doesn't really help much with splitting case. + // we should upsert large no of rows as a prerequisite to splitting + upsertDataBeforeSplit(tableName); + } else { + upsertData(tableName); + } TableName hbaseTableName = TableName.valueOf(tableName); - try (Connection conn = DriverManager.getConnection(getUrl())) { - HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + try (Connection conn = DriverManager.getConnection(getUrl()); + HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) { if (shouldSplit) { - splitTableSync(admin, hbaseTableName, "BBBB".getBytes(), 2); + splitTableSync(admin, hbaseTableName, Bytes.toBytes("CCCC"), 2); } - admin.snapshot(SNAPSHOT_NAME, hbaseTableName); + snapshotCreateSync(hbaseTableName, admin, SNAPSHOT_NAME); List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots(); Assert.assertEquals(tableName, snapshots.get(0).getTable()); @@ -336,25 +368,95 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { } } - private void splitTableSync(HBaseAdmin admin, TableName hbaseTableName, - byte[] splitPoint , int expectedRegions) throws IOException, InterruptedException { - admin.split(hbaseTableName, splitPoint); - for (int i = 0; i < 100; i++) { - List<HRegionInfo> hRegionInfoList = admin.getTableRegions(hbaseTableName); - if (hRegionInfoList.size() >= expectedRegions) { + private void snapshotCreateSync(TableName hbaseTableName, + HBaseAdmin admin, String snapshotName) throws IOException, InterruptedException { + boolean isSnapshotCreated = false; + HBaseProtos.SnapshotDescription snapshotDescription = + HBaseProtos.SnapshotDescription.newBuilder().setName(snapshotName).build(); + // 3 retries while creating snapshot. if all 3 retries exhausted, we have + // some valid issue. + for (int i = 0; i < 3; i++) { + if (isSnapshotCreated) { break; } - LOGGER.info("Sleeping for 1000 ms while waiting for " + hbaseTableName.getNameAsString() + " to split"); + if (i > 0) { + LOGGER.info("Retry count {} for snapshot creation", i); + } + try { + admin.snapshot(snapshotName, hbaseTableName); + } catch (Exception e) { + LOGGER.info("Snapshot creation failure for {}", snapshotName, e); + continue; + } + // verify if snapshot was created in 10s + for (int j = 0; j < 10; j++) { + Thread.sleep(1000); + try { + if (admin.isSnapshotFinished(snapshotDescription)) { + isSnapshotCreated = true; + break; + } + } catch (Exception e) { + LOGGER.error("Snapshot creation failed.", e); + break; + } + } + } + if (!isSnapshotCreated) { + throw new IOException("Snapshot creation failed for " + snapshotName); + } + } + + private void splitTableSync(HBaseAdmin admin, TableName hbaseTableName, + byte[] splitPoint, int expectedRegions) throws IOException, + InterruptedException { + admin.split(hbaseTableName, splitPoint); + AssignmentManager assignmentManager = + getUtility().getHBaseCluster().getMaster().getAssignmentManager(); + // wait for split daughter regions coming online for ~20s + for (int i = 0; i < 20; i++) { Thread.sleep(1000); + List<HRegion> regions = getUtility().getHBaseCluster() + .getRegions(hbaseTableName); + if (regions.size() >= expectedRegions) { + boolean allRegionsOnline = true; + for (HRegion region : regions) { + if (!assignmentManager.getRegionStates() + .isRegionOnline(region.getRegionInfo())) { + allRegionsOnline = false; + break; + } + } + if (allRegionsOnline) { + break; + } + } + LOGGER.info("Sleeping for 1000 ms while waiting for {} to split and all regions to come online", + hbaseTableName.getNameAsString()); } } - private void deleteSnapshot(String tableName) throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl()); - HBaseAdmin admin = - conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) { - admin.deleteSnapshot(SNAPSHOT_NAME); + private void deleteSnapshotIfExists(String snapshotName) throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl()); + HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) { + List<HBaseProtos.SnapshotDescription> snapshotDescriptions = admin.listSnapshots(); + boolean isSnapshotPresent = false; + if (CollectionUtils.isNotEmpty(snapshotDescriptions)) { + for (HBaseProtos.SnapshotDescription snapshotDescription : snapshotDescriptions) { + if (snapshotName.equals(snapshotDescription.getName())) { + isSnapshotPresent = true; + break; + } } + } + // delete snapshot only if exists and it is not corrupted + if (isSnapshotPresent) { + admin.deleteSnapshot(snapshotName); + } else { + LOGGER.info("Snapshot {} does not exist. Possibly corrupted due to region movements.", + snapshotName); + } + } } public static class TableSnapshotMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> {