This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 53e5b1a  PHOENIX-6153 Table Map Reduce job after a Snapshot based job 
fails with CorruptedSnapshotException
53e5b1a is described below

commit 53e5b1a44ee70784c4c11716e73673f28c0f72f4
Author: sakshamgangwar <sakshamgangwa...@gmail.com>
AuthorDate: Tue Sep 29 14:46:41 2020 -0700

    PHOENIX-6153 Table Map Reduce job after a Snapshot based job fails with 
CorruptedSnapshotException
    
    Signed-off-by: Chinmay Kulkarni <chinmayskulka...@apache.org>
---
 .../end2end/TableSnapshotReadsMapReduceIT.java     | 162 ++++++++++++++++++---
 .../phoenix/mapreduce/PhoenixInputFormat.java      |   7 +-
 2 files changed, 147 insertions(+), 22 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
index 8719c35..930ff0b 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.sql.Array;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -40,14 +41,22 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.phoenix.iterate.TestingMapReduceParallelScanGrouper;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.PhoenixTestingInputFormat;
 import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.schema.types.PDouble;
+import org.apache.phoenix.schema.types.PhoenixArray;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.Assert;
 import org.junit.Before;
@@ -62,6 +71,10 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
 
+  private static final String STOCK_NAME = "STOCK_NAME";
+  private static final String RECORDING_YEAR = "RECORDING_YEAR";
+  private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
+  private static final String MAX_RECORDING = "MAX_RECORDING";
   private final static String SNAPSHOT_NAME = "FOO";
   private static final String FIELD1 = "FIELD1";
   private static final String FIELD2 = "FIELD2";
@@ -69,7 +82,14 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
   private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
       " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT 
pk PRIMARY KEY (FIELD1 ))";
   private String UPSERT = "UPSERT into %s values (?, ?, ?)";
-
+  private static final String CREATE_STOCK_TABLE =
+          "CREATE TABLE IF NOT EXISTS %s ( " + STOCK_NAME + " VARCHAR NOT NULL 
, " + RECORDING_YEAR
+                  + "  INTEGER NOT  NULL,  " + RECORDINGS_QUARTER + " "
+                  + " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( " + 
STOCK_NAME + ", "
+                  + RECORDING_YEAR + " )) " + "SPLIT ON ('AA')";
+  private static final String CREATE_STOCK_STATS_TABLE =
+          "CREATE TABLE IF NOT EXISTS %s(" + STOCK_NAME + " VARCHAR NOT NULL , 
" + MAX_RECORDING
+                  + " DOUBLE CONSTRAINT pk PRIMARY KEY (" + STOCK_NAME + " ))";
   private static List<List<Object>> result;
   private long timestamp;
   private String tableName;
@@ -86,10 +106,11 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
   @Before
   public void before() throws SQLException, IOException {
     // create table
-    Connection conn = DriverManager.getConnection(getUrl());
-    tableName = generateUniqueName();
-    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
-    conn.commit();
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      tableName = generateUniqueName();
+      conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+      conn.commit();
+    }
 
     // configure Phoenix M/R job to read snapshot
     conf = getUtility().getConfiguration();
@@ -126,6 +147,90 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
     configureJob(job, tableName, inputQuery, null, false);
   }
 
+  @Test
+  public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws 
Exception {
+    //Submitting and asserting successful Map Reduce Job over snapshots
+    PhoenixMapReduceUtil
+            .setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, 
tableName, tmpDir, null,
+                    FIELD1, FIELD2, FIELD3);
+    configureJob(job, tableName, null, null, false);
+
+    //Asserting that snapshot name is set in configuration
+    Configuration config = job.getConfiguration();
+    Assert.assertEquals("Correct snapshot name not found in configuration", 
SNAPSHOT_NAME,
+            config.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY));
+
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      // create table
+      tableName = generateUniqueName();
+      conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+      conn.commit();
+
+      //Submitting next map reduce job over table and making sure that it does 
not fail with
+      // any wrong snapshot properties set in common configurations which are
+      // used across all jobs.
+      job = createAndTestJob(conn);
+    }
+    //Asserting that snapshot name is no more set in common shared 
configuration
+    config = job.getConfiguration();
+    Assert.assertNull("Snapshot name is not null in Configuration",
+            config.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY));
+
+  }
+
+  private Job createAndTestJob(Connection conn)
+          throws SQLException, IOException, InterruptedException, 
ClassNotFoundException {
+    String stockTableName = generateUniqueName();
+    String stockStatsTableName = generateUniqueName();
+    conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, 
stockTableName));
+    conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, 
stockStatsTableName));
+    conn.commit();
+    final Configuration conf = ((PhoenixConnection) 
conn).getQueryServices().getConfiguration();
+    Job job = Job.getInstance(conf);
+    PhoenixMapReduceUtil.setInput(job, MapReduceIT.StockWritable.class, 
PhoenixTestingInputFormat.class,
+            stockTableName, null, STOCK_NAME, RECORDING_YEAR, "0." + 
RECORDINGS_QUARTER);
+    testJob(conn, job, stockTableName, stockStatsTableName);
+    return job;
+  }
+
+  private void testJob(Connection conn, Job job, String stockTableName, String 
stockStatsTableName)
+          throws SQLException, InterruptedException, IOException, 
ClassNotFoundException {
+    assertEquals("Failed to reset getRegionBoundaries counter for 
scanGrouper", 0,
+            
TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
+    upsertData(conn, stockTableName);
+
+    // only run locally, rather than having to spin up a MiniMapReduce cluster 
and lets us use breakpoints
+    job.getConfiguration().set("mapreduce.framework.name", "local");
+
+    setOutput(job, stockStatsTableName);
+
+    job.setMapperClass(MapReduceIT.StockMapper.class);
+    job.setReducerClass(MapReduceIT.StockReducer.class);
+    job.setOutputFormatClass(PhoenixOutputFormat.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(DoubleWritable.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(MapReduceIT.StockWritable.class);
+
+    // run job and assert if success
+    assertTrue("Job didn't complete successfully! Check logs for reason.", 
job.waitForCompletion(true));
+  }
+
+  /**
+   * Custom output setting because output upsert statement setting is broken 
(PHOENIX-2677)
+   *
+   * @param job to update
+   */
+  private void setOutput(Job job, String stockStatsTableName) {
+    final Configuration configuration = job.getConfiguration();
+    PhoenixConfigurationUtil.setOutputTableName(configuration, 
stockStatsTableName);
+    configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, "UPSERT into 
" + stockStatsTableName +
+            " (" + STOCK_NAME + ", " + MAX_RECORDING + ") values (?,?)");
+    job.setOutputFormatClass(PhoenixOutputFormat.class);
+  }
+
+
   private void configureJob(Job job, String tableName, String inputQuery, 
String condition, boolean shouldSplit) throws Exception {
     try {
       upsertAndSnapshot(tableName, shouldSplit);
@@ -169,6 +274,22 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
     }
   }
 
+  private void upsertData(Connection conn, String stockTableName) throws 
SQLException {
+    PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, 
stockTableName));
+    upsertData(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3});
+    upsertData(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3});
+    conn.commit();
+  }
+
+  private void upsertData(PreparedStatement stmt, String name, int year, 
Double[] data) throws SQLException {
+    int i = 1;
+    stmt.setString(i++, name);
+    stmt.setInt(i++, year);
+    Array recordings = new 
PhoenixArray.PrimitiveDoublePhoenixArray(PDouble.INSTANCE, data);
+    stmt.setArray(i++, recordings);
+    stmt.execute();
+  }
+
   private void upsertData(String tableName) throws SQLException {
     Connection conn = DriverManager.getConnection(getUrl());
     PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, 
tableName));
@@ -192,26 +313,27 @@ public class TableSnapshotReadsMapReduceIT extends 
BaseUniqueNamesOwnClusterIT {
     upsertData(tableName);
 
     TableName hbaseTableName = TableName.valueOf(tableName);
-    Connection conn = DriverManager.getConnection(getUrl());
-    HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
 
-    if (shouldSplit) {
-      splitTableSync(admin, hbaseTableName, "BBBB".getBytes(), 2);
-    }
+      if (shouldSplit) {
+        splitTableSync(admin, hbaseTableName, "BBBB".getBytes(), 2);
+      }
 
-    admin.snapshot(SNAPSHOT_NAME, hbaseTableName);
+      admin.snapshot(SNAPSHOT_NAME, hbaseTableName);
 
-    List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots();
-    Assert.assertEquals(tableName, snapshots.get(0).getTable());
+      List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots();
+      Assert.assertEquals(tableName, snapshots.get(0).getTable());
 
-    // Capture the snapshot timestamp to use as SCN while reading the table 
later
-    // Assigning the timestamp value here will make tests less flaky
-    timestamp = System.currentTimeMillis();
+      // Capture the snapshot timestamp to use as SCN while reading the table 
later
+      // Assigning the timestamp value here will make tests less flaky
+      timestamp = System.currentTimeMillis();
 
-    // upsert data after snapshot
-    PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, 
tableName));
-    upsertData(stmt, "DDDD", "SNFB", 45);
-    conn.commit();
+      // upsert data after snapshot
+      PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, 
tableName));
+      upsertData(stmt, "DDDD", "SNFB", 45);
+      conn.commit();
+    }
   }
 
   private void splitTableSync(HBaseAdmin admin, TableName hbaseTableName,
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 1272ecd..4711dbb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -210,9 +210,12 @@ public class PhoenixInputFormat<T extends DBWritable> 
extends InputFormat<NullWr
 
               // setting the snapshot configuration
               String snapshotName = 
configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+              Configuration config = 
queryPlan.getContext().getConnection().getQueryServices().getConfiguration();
               if (snapshotName != null) {
-                  
PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
-                      getQueryServices().getConfiguration(), snapshotName);
+                  PhoenixConfigurationUtil.setSnapshotNameKey(config, 
snapshotName);
+              } else {
+                  // making sure we unset snapshot name as new job doesn't 
need it
+                  config.unset(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
               }
 
               return queryPlan;

Reply via email to