This is an automated email from the ASF dual-hosted git repository. chinmayskulkarni pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 53e5b1a PHOENIX-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException 53e5b1a is described below commit 53e5b1a44ee70784c4c11716e73673f28c0f72f4 Author: sakshamgangwar <sakshamgangwa...@gmail.com> AuthorDate: Tue Sep 29 14:46:41 2020 -0700 PHOENIX-6153 Table Map Reduce job after a Snapshot based job fails with CorruptedSnapshotException Signed-off-by: Chinmay Kulkarni <chinmayskulka...@apache.org> --- .../end2end/TableSnapshotReadsMapReduceIT.java | 162 ++++++++++++++++++--- .../phoenix/mapreduce/PhoenixInputFormat.java | 7 +- 2 files changed, 147 insertions(+), 22 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java index 8719c35..930ff0b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.sql.Array; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -40,14 +41,22 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.phoenix.iterate.TestingMapReduceParallelScanGrouper; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.PhoenixOutputFormat; +import org.apache.phoenix.mapreduce.PhoenixTestingInputFormat; import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.schema.types.PDouble; +import org.apache.phoenix.schema.types.PhoenixArray; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.Assert; import org.junit.Before; @@ -62,6 +71,10 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { private static final Logger LOGGER = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class); + private static final String STOCK_NAME = "STOCK_NAME"; + private static final String RECORDING_YEAR = "RECORDING_YEAR"; + private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER"; + private static final String MAX_RECORDING = "MAX_RECORDING"; private final static String SNAPSHOT_NAME = "FOO"; private static final String FIELD1 = "FIELD1"; private static final String FIELD2 = "FIELD2"; @@ -69,7 +82,14 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " + " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))"; private String UPSERT = "UPSERT into %s values (?, ?, ?)"; - + private static final String CREATE_STOCK_TABLE = + "CREATE TABLE IF NOT EXISTS %s ( " + STOCK_NAME + " VARCHAR NOT NULL , " + RECORDING_YEAR + + " INTEGER NOT NULL, " + RECORDINGS_QUARTER + " " + + " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( " + STOCK_NAME + ", " + + RECORDING_YEAR + " )) " + "SPLIT ON ('AA')"; + private static final String CREATE_STOCK_STATS_TABLE = + "CREATE TABLE IF NOT EXISTS %s(" + STOCK_NAME + " VARCHAR NOT NULL , " + MAX_RECORDING + + " DOUBLE CONSTRAINT pk PRIMARY KEY (" + STOCK_NAME + " ))"; private static List<List<Object>> result; private long timestamp; private String tableName; @@ -86,10 +106,11 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { @Before public void before() throws SQLException, IOException { // create table - Connection conn = DriverManager.getConnection(getUrl()); - tableName = generateUniqueName(); - conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); - conn.commit(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + tableName = generateUniqueName(); + conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); + conn.commit(); + } // configure Phoenix M/R job to read snapshot conf = getUtility().getConfiguration(); @@ -126,6 +147,90 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { configureJob(job, tableName, inputQuery, null, false); } + @Test + public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception { + //Submitting and asserting successful Map Reduce Job over snapshots + PhoenixMapReduceUtil + .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, tmpDir, null, + FIELD1, FIELD2, FIELD3); + configureJob(job, tableName, null, null, false); + + //Asserting that snapshot name is set in configuration + Configuration config = job.getConfiguration(); + Assert.assertEquals("Correct snapshot name not found in configuration", SNAPSHOT_NAME, + config.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY)); + + try (Connection conn = DriverManager.getConnection(getUrl())) { + // create table + tableName = generateUniqueName(); + conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); + conn.commit(); + + //Submitting next map reduce job over table and making sure that it does not fail with + // any wrong snapshot properties set in common configurations which are + // used across all jobs. + job = createAndTestJob(conn); + } + //Asserting that snapshot name is no more set in common shared configuration + config = job.getConfiguration(); + Assert.assertNull("Snapshot name is not null in Configuration", + config.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY)); + + } + + private Job createAndTestJob(Connection conn) + throws SQLException, IOException, InterruptedException, ClassNotFoundException { + String stockTableName = generateUniqueName(); + String stockStatsTableName = generateUniqueName(); + conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName)); + conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, stockStatsTableName)); + conn.commit(); + final Configuration conf = ((PhoenixConnection) conn).getQueryServices().getConfiguration(); + Job job = Job.getInstance(conf); + PhoenixMapReduceUtil.setInput(job, MapReduceIT.StockWritable.class, PhoenixTestingInputFormat.class, + stockTableName, null, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER); + testJob(conn, job, stockTableName, stockStatsTableName); + return job; + } + + private void testJob(Connection conn, Job job, String stockTableName, String stockStatsTableName) + throws SQLException, InterruptedException, IOException, ClassNotFoundException { + assertEquals("Failed to reset getRegionBoundaries counter for scanGrouper", 0, + TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries()); + upsertData(conn, stockTableName); + + // only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints + job.getConfiguration().set("mapreduce.framework.name", "local"); + + setOutput(job, stockStatsTableName); + + job.setMapperClass(MapReduceIT.StockMapper.class); + job.setReducerClass(MapReduceIT.StockReducer.class); + job.setOutputFormatClass(PhoenixOutputFormat.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(DoubleWritable.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(MapReduceIT.StockWritable.class); + + // run job and assert if success + assertTrue("Job didn't complete successfully! Check logs for reason.", job.waitForCompletion(true)); + } + + /** + * Custom output setting because output upsert statement setting is broken (PHOENIX-2677) + * + * @param job to update + */ + private void setOutput(Job job, String stockStatsTableName) { + final Configuration configuration = job.getConfiguration(); + PhoenixConfigurationUtil.setOutputTableName(configuration, stockStatsTableName); + configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, "UPSERT into " + stockStatsTableName + + " (" + STOCK_NAME + ", " + MAX_RECORDING + ") values (?,?)"); + job.setOutputFormatClass(PhoenixOutputFormat.class); + } + + private void configureJob(Job job, String tableName, String inputQuery, String condition, boolean shouldSplit) throws Exception { try { upsertAndSnapshot(tableName, shouldSplit); @@ -169,6 +274,22 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { } } + private void upsertData(Connection conn, String stockTableName) throws SQLException { + PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, stockTableName)); + upsertData(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3}); + upsertData(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3}); + conn.commit(); + } + + private void upsertData(PreparedStatement stmt, String name, int year, Double[] data) throws SQLException { + int i = 1; + stmt.setString(i++, name); + stmt.setInt(i++, year); + Array recordings = new PhoenixArray.PrimitiveDoublePhoenixArray(PDouble.INSTANCE, data); + stmt.setArray(i++, recordings); + stmt.execute(); + } + private void upsertData(String tableName) throws SQLException { Connection conn = DriverManager.getConnection(getUrl()); PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); @@ -192,26 +313,27 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT { upsertData(tableName); TableName hbaseTableName = TableName.valueOf(tableName); - Connection conn = DriverManager.getConnection(getUrl()); - HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - if (shouldSplit) { - splitTableSync(admin, hbaseTableName, "BBBB".getBytes(), 2); - } + if (shouldSplit) { + splitTableSync(admin, hbaseTableName, "BBBB".getBytes(), 2); + } - admin.snapshot(SNAPSHOT_NAME, hbaseTableName); + admin.snapshot(SNAPSHOT_NAME, hbaseTableName); - List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots(); - Assert.assertEquals(tableName, snapshots.get(0).getTable()); + List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots(); + Assert.assertEquals(tableName, snapshots.get(0).getTable()); - // Capture the snapshot timestamp to use as SCN while reading the table later - // Assigning the timestamp value here will make tests less flaky - timestamp = System.currentTimeMillis(); + // Capture the snapshot timestamp to use as SCN while reading the table later + // Assigning the timestamp value here will make tests less flaky + timestamp = System.currentTimeMillis(); - // upsert data after snapshot - PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); - upsertData(stmt, "DDDD", "SNFB", 45); - conn.commit(); + // upsert data after snapshot + PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); + upsertData(stmt, "DDDD", "SNFB", 45); + conn.commit(); + } } private void splitTableSync(HBaseAdmin admin, TableName hbaseTableName, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java index 1272ecd..4711dbb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java @@ -210,9 +210,12 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr // setting the snapshot configuration String snapshotName = configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY); + Configuration config = queryPlan.getContext().getConnection().getQueryServices().getConfiguration(); if (snapshotName != null) { - PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection(). - getQueryServices().getConfiguration(), snapshotName); + PhoenixConfigurationUtil.setSnapshotNameKey(config, snapshotName); + } else { + // making sure we unset snapshot name as new job doesn't need it + config.unset(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY); } return queryPlan;