Repository: hive
Updated Branches:
  refs/heads/master ec82b84f3 -> 7d2fb003a


HIVE-15534 : Update db/table repl.last.id at the end of REPL LOAD of a batch of 
events (Sushanth Sowmyan, reviewed by Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7d2fb003
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7d2fb003
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7d2fb003

Branch: refs/heads/master
Commit: 7d2fb003adc23601f6fb3fc2392d8283c9005a0b
Parents: ec82b84
Author: Sushanth Sowmyan <khorg...@gmail.com>
Authored: Tue Jan 17 14:01:42 2017 -0800
Committer: Sushanth Sowmyan <khorg...@gmail.com>
Committed: Tue Jan 17 14:03:19 2017 -0800

----------------------------------------------------------------------
 .../hive/ql/TestReplicationScenarios.java       | 130 +++++++++++++--
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |  17 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 159 +++++++++++++++----
 .../hadoop/hive/ql/parse/ReplicationSpec.java   |  29 ++++
 4 files changed, 295 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7d2fb003/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 778c13a..76e8f6c 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.util.Shell;
@@ -44,6 +45,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static junit.framework.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
@@ -180,8 +182,7 @@ public class TestReplicationScenarios {
     printOutput();
     run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
 
-    run("REPL STATUS " + dbName + "_dupe");
-    verifyResults(new String[] {replDumpId});
+    verifyRun("REPL STATUS " + dbName + "_dupe", replDumpId);
 
     verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
     verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1);
@@ -256,11 +257,7 @@ public class TestReplicationScenarios {
     run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'");
 
     run("REPL STATUS " + dbName + "_dupe");
-//    verifyResults(new String[] {incrementalDumpId});
-    // TODO: this will currently not work because we need to add in ALTER_DB 
support into this
-    // and queue in a dummy ALTER_DB to update the repl.last.id on the last 
event of every
-    // incremental dump. Currently, the dump id fetched will be the last dump 
id at the time
-    // the db was created from the bootstrap export dump
+    verifyResults(new String[] {incrementalDumpId});
 
     // VERIFY tables and partitions on destination for equivalence.
 
@@ -332,7 +329,7 @@ public class TestReplicationScenarios {
     run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
     printOutput();
     run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
-    verifySetup("REPL STATUS " + dbName + "_dupe", new String[] {replDumpId});
+    verifySetup("REPL STATUS " + dbName + "_dupe", new String[]{replDumpId});
 
     verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data);
     verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", 
ptn_data_1);
@@ -351,7 +348,7 @@ public class TestReplicationScenarios {
     verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", empty);
     verifySetup("SELECT a from " + dbName + ".ptned", ptn_data_1);
     verifySetup("SELECT a from " + dbName + ".ptned3 WHERE b=1",empty);
-    verifySetup("SELECT a from " + dbName + ".ptned3",ptn_data_2);
+    verifySetup("SELECT a from " + dbName + ".ptned3", ptn_data_2);
 
     // replicate the incremental drops
 
@@ -663,6 +660,117 @@ public class TestReplicationScenarios {
     verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", 
ptn_data_2);
   }
 
+  @Test
+  public void testStatus() throws IOException {
+    // first test ReplStateMap functionality
+    Map<String,Long> cmap = new ReplStateMap<String,Long>();
+
+    Long oldV;
+    oldV = cmap.put("a",1L);
+    assertEquals(1L,cmap.get("a").longValue());
+    assertEquals(null,oldV);
+
+    cmap.put("b",2L);
+    oldV = cmap.put("b",-2L);
+    assertEquals(2L, cmap.get("b").longValue());
+    assertEquals(2L, oldV.longValue());
+
+    cmap.put("c",3L);
+    oldV = cmap.put("c",33L);
+    assertEquals(33L, cmap.get("c").longValue());
+    assertEquals(3L, oldV.longValue());
+
+    // Now, to actually testing status - first, we bootstrap.
+
+    String testName = "incrementalStatus";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String lastReplDumpLocn = getResult(0, 0);
+    String lastReplDumpId = getResult(0, 1, true);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + lastReplDumpLocn + "'");
+
+    // Bootstrap done, now on to incremental. First, we test db-level REPL 
LOADs.
+    // Both db-level and table-level repl.last.id must be updated.
+
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", 
lastReplDumpId,
+        "CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) 
STORED AS TEXTFILE");
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", 
lastReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)");
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", 
lastReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION 
(b=11)");
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", 
lastReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned SET TBLPROPERTIES ('blah'='foo')");
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned_rn", 
lastReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned RENAME TO  " + dbName + ".ptned_rn");
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned_rn", 
lastReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned_rn DROP PARTITION (b=11)");
+    lastReplDumpId = verifyAndReturnDbReplStatus(dbName, null, lastReplDumpId,
+        "DROP TABLE " + dbName + ".ptned_rn");
+
+    // DB-level REPL LOADs testing done, now moving on to table level repl 
loads.
+    // In each of these cases, the table-level repl.last.id must move forward, 
but the
+    // db-level last.repl.id must not.
+
+    String lastTblReplDumpId = lastReplDumpId;
+    lastTblReplDumpId = verifyAndReturnTblReplStatus(
+        dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+        "CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b int) 
STORED AS TEXTFILE");
+    lastTblReplDumpId = verifyAndReturnTblReplStatus(
+        dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned2 ADD PARTITION (b=1)");
+    lastTblReplDumpId = verifyAndReturnTblReplStatus(
+        dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned2 PARTITION (b=1) RENAME TO PARTITION 
(b=11)");
+    lastTblReplDumpId = verifyAndReturnTblReplStatus(
+        dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned2 SET TBLPROPERTIES ('blah'='foo')");
+    // Note : Not testing table rename because table rename replication is not 
supported for table-level repl.
+    String finalTblReplDumpId = verifyAndReturnTblReplStatus(
+        dbName, "ptned2", lastReplDumpId, lastTblReplDumpId,
+        "ALTER TABLE " + dbName + ".ptned2 DROP PARTITION (b=11)");
+
+    assertTrue(finalTblReplDumpId.compareTo(lastTblReplDumpId) > 0);
+
+    // TODO : currently not testing the following scenarios:
+    //   a) Multi-db wh-level REPL LOAD - need to add that
+    //   b) Insert into tables - quite a few cases need to be enumerated 
there, including dyn adds.
+
+  }
+
+  private String verifyAndReturnDbReplStatus(String dbName, String tblName, 
String prevReplDumpId, String cmd) throws IOException {
+    run(cmd);
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + prevReplDumpId);
+    String lastDumpLocn = getResult(0, 0);
+    String lastReplDumpId = getResult(0, 1, true);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + lastDumpLocn + "'");
+    verifyRun("REPL STATUS " + dbName + "_dupe", lastReplDumpId);
+    if (tblName != null){
+      verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId);
+    }
+    assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0);
+    return lastReplDumpId;
+  }
+
+  // Tests that doing a table-level REPL LOAD updates table repl.last.id, but 
not db-level repl.last.id
+  private String verifyAndReturnTblReplStatus(
+      String dbName, String tblName, String lastDbReplDumpId, String 
prevReplDumpId, String cmd) throws IOException {
+    run(cmd);
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + "."+ tblName + " FROM " + prevReplDumpId);
+    String lastDumpLocn = getResult(0, 0);
+    String lastReplDumpId = getResult(0, 1, true);
+    run("REPL LOAD " + dbName + "_dupe." + tblName + " FROM '" + lastDumpLocn 
+ "'");
+    verifyRun("REPL STATUS " + dbName + "_dupe", lastDbReplDumpId);
+    verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId);
+    assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0);
+    return lastReplDumpId;
+  }
+
 
   private String getResult(int rowNum, int colNum) throws IOException {
     return getResult(rowNum,colNum,false);
@@ -715,6 +823,10 @@ public class TestReplicationScenarios {
     }
   }
 
+  private void verifyRun(String cmd, String data) throws IOException {
+    verifyRun(cmd, new String[] { data });
+  }
+
   private void verifyRun(String cmd, String[] data) throws IOException {
     run(cmd);
     verifyResults(data);

http://git-wip-us.apache.org/repos/asf/hive/blob/7d2fb003/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 8c5cac2..7bb48a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -25,6 +25,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -137,7 +138,8 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       tableExists = prepareImport(
           isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor,
           parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, 
fromTree.getText(),
-          new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, 
outputs, rootTasks, LOG, ctx));
+          new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, 
outputs, rootTasks, LOG, ctx),
+          null, null);
 
     } catch (SemanticException e) {
       throw e;
@@ -173,7 +175,8 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, 
boolean waitOnPrecursor,
       String parsedLocation, String parsedTableName, String parsedDbName,
       LinkedHashMap<String, String> parsedPartSpec,
-      String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x
+      String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x,
+      Map<String,Long> dbsUpdated, Map<String,Long> tablesUpdated
   ) throws IOException, MetaException, HiveException, URISyntaxException {
 
     // initialize load path
@@ -201,6 +204,11 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       // If the parsed statement contained a db.tablename specification, 
prefer that.
       dbname = parsedDbName;
     }
+    if (dbsUpdated != null){
+      dbsUpdated.put(
+          dbname,
+          Long.valueOf(replicationSpec.get(ReplicationSpec.KEY.EVENT_ID)));
+    }
 
     // Create table associated with the import
     // Executed if relevant, and used to contain all the other details about 
the table if not.
@@ -225,6 +233,11 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     if ((parsedTableName!= null) && (!parsedTableName.isEmpty())){
       tblDesc.setTableName(parsedTableName);
     }
+    if (tablesUpdated != null){
+      tablesUpdated.put(
+          dbname + "." + tblDesc.getTableName(),
+          Long.valueOf(replicationSpec.get(ReplicationSpec.KEY.EVENT_ID)));
+    }
 
     List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
     Iterable<Partition> partitions = rv.getPartitions();

http://git-wip-us.apache.org/repos/asf/hive/blob/7d2fb003/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 98cd3b3..53ea346 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
 import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
@@ -336,7 +337,9 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
           // FIXME : implement consolidateEvent(..) similar to 
dumpEvent(ev,evRoot)
         }
-        LOG.info("Consolidation done, preparing to return {},{}->{}", 
dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
+        LOG.info(
+            "Consolidation done, preparing to return {},{}->{}",
+            dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
         dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, 
bootDumpEndReplId);
         dmd.write();
 
@@ -376,7 +379,9 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         }
 
         LOG.info("Done dumping events, preparing to return {},{}", 
dumpRoot.toUri(), eventTo);
-        writeOutput(Arrays.asList("incremental", String.valueOf(eventFrom), 
String.valueOf(eventTo)), dmd.getDumpFilePath());
+        writeOutput(
+            Arrays.asList("incremental", String.valueOf(eventFrom), 
String.valueOf(eventTo)),
+            dmd.getDumpFilePath());
         dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo);
         dmd.write();
         // Set the correct last repl id to return to the user
@@ -773,7 +778,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
       if ((!evDump) && (tblNameOrPattern != null) && 
!(tblNameOrPattern.isEmpty())) {
         // not an event dump, and table name pattern specified, this has to be 
a tbl-level dump
-        rootTasks.addAll(analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, 
path, null));
+        rootTasks.addAll(analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, 
path, null, null, null));
         return;
       }
 
@@ -807,9 +812,16 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         }
       } else {
         // event dump, each subdir is an individual event dump.
+        Arrays.sort(dirsInLoadPath); // we need to guarantee that the 
directory listing we got is in order of evid.
+
         Task<? extends Serializable> evTaskRoot = TaskFactory.get(new 
DependencyCollectionWork(), conf);
         Task<? extends Serializable> taskChainTail = evTaskRoot;
+
         int evstage = 0;
+        Long lastEvid = null;
+        Map<String,Long> dbsUpdated = new 
ReplicationSpec.ReplStateMap<String,Long>();
+        Map<String,Long> tablesUpdated = new 
ReplicationSpec.ReplStateMap<String,Long>();
+
         for (FileStatus dir : dirsInLoadPath){
           LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), 
dbNameOrPattern, tblNameOrPattern);
           // event loads will behave similar to table loads, with one crucial 
difference
@@ -831,8 +843,11 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           // Once this entire chain is generated, we add evTaskRoot to 
rootTasks, so as to execute the
           // entire chain
 
+          String locn = dir.getPath().toUri().toString();
+          DumpMetaData eventDmd = new DumpMetaData(new Path(locn));
           List<Task<? extends Serializable>> evTasks = analyzeEventLoad(
-              dbNameOrPattern, tblNameOrPattern, 
dir.getPath().toUri().toString(), taskChainTail);
+              dbNameOrPattern, tblNameOrPattern, locn, taskChainTail,
+              dbsUpdated, tablesUpdated, eventDmd);
           LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? 
evTasks.size() : 0);
           if ((evTasks != null) && (!evTasks.isEmpty())){
             Task<? extends Serializable> barrierTask = TaskFactory.get(new 
DependencyCollectionWork(), conf);
@@ -845,13 +860,80 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
                 taskChainTail.getClass(), taskChainTail.getId(), 
barrierTask.getClass(), barrierTask.getId());
             taskChainTail = barrierTask;
             evstage++;
+            lastEvid = dmd.eventTo;
           }
         }
-        // TODO : Over here, we need to track a Map<dbName:String,evLast:Long> 
for every db updated
-        // and update repl.last.id for each, if this is a wh-level load, and 
if it is a db-level load,
-        // then a single repl.last.id update, and if this is a tbl-lvl load 
which does not alter the
-        // table itself, we'll need to update repl.last.id for that as well.
-        LOG.debug("added evTaskRoot {}:{}", evTaskRoot.getClass(), 
evTaskRoot.getId());
+
+        // Now, we need to update repl.last.id for the various parent objects 
that were updated.
+        // This update logic will work differently based on what "level" REPL 
LOAD was run on.
+        //  a) If this was a REPL LOAD at a table level, i.e. both 
dbNameOrPattern and
+        //     tblNameOrPattern were specified, then the table is the only 
thing we should
+        //     update the repl.last.id for.
+        //  b) If this was a db-level REPL LOAD, then we should update the db, 
as well as any
+        //     tables affected by partition level operations. (any table level 
ops will
+        //     automatically be updated as the table gets updated. Note - 
renames will need
+        //     careful handling.
+        //  c) If this was a wh-level REPL LOAD, then we should update every 
db for which there
+        //     were events occurring, as well as tables for which there were 
ptn-level ops
+        //     happened. Again, renames must be taken care of.
+        //
+        // So, what we're going to do is have each event load update 
dbsUpdated and tablesUpdated
+        // accordingly, but ignore updates to tablesUpdated & dbsUpdated in 
the case of a
+        // table-level REPL LOAD, using only the table itself. In the case of 
a db-level REPL
+        // LOAD, we ignore dbsUpdated, but inject our own, and do not ignore 
tblsUpdated.
+        // And for wh-level, we do no special processing, and use all of 
dbsUpdated and
+        // tblsUpdated as-is.
+
+        // Additional Note - although this var says "dbNameOrPattern", on REPL 
LOAD side,
+        // we do not support a pattern It can be null or empty, in which case
+        // we re-use the existing name from the dump, or it can be specified,
+        // in which case we honour it. However, having this be a pattern is an 
error.
+        // Ditto for tblNameOrPattern.
+
+
+        if (evstage > 0){
+          if ((tblNameOrPattern != null) && (!tblNameOrPattern.isEmpty())){
+            // if tblNameOrPattern is specified, then dbNameOrPattern will be 
too, and
+            // thus, this is a table-level REPL LOAD - only table needs 
updating.
+            // If any of the individual events logged any other dbs as having 
changed,
+            // null them out.
+            dbsUpdated.clear();
+            tablesUpdated.clear();
+            tablesUpdated.put(dbNameOrPattern + "." + tblNameOrPattern, 
lastEvid);
+          } else  if ((dbNameOrPattern != null) && 
(!dbNameOrPattern.isEmpty())){
+            // if dbNameOrPattern is specified and tblNameOrPattern isn't, 
this is a
+            // db-level update, and thus, the database needs updating. In 
addition.
+            dbsUpdated.clear();
+            dbsUpdated.put(dbNameOrPattern, lastEvid);
+          }
+        }
+
+        for (String tableName : tablesUpdated.keySet()){
+          // weird - AlterTableDesc requires a HashMap to update props instead 
of a Map.
+          HashMap<String,String> mapProp = new HashMap<String,String>();
+          mapProp.put(
+              ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+              tablesUpdated.get(tableName).toString());
+          AlterTableDesc alterTblDesc =  new AlterTableDesc(
+              AlterTableDesc.AlterTableTypes.ADDPROPS, null, false);
+          alterTblDesc.setProps(mapProp);
+          alterTblDesc.setOldName(tableName);
+          Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
+              new DDLWork(inputs, outputs, alterTblDesc), conf);
+          taskChainTail.addDependentTask(updateReplIdTask);
+          taskChainTail = updateReplIdTask;
+        }
+        for (String dbName : dbsUpdated.keySet()){
+          Map<String,String> mapProp = new HashMap<String,String>();
+          mapProp.put(
+              ReplicationSpec.KEY.CURR_STATE_ID.toString(),
+              dbsUpdated.get(dbName).toString());
+          AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, 
mapProp);
+          Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
+              new DDLWork(inputs, outputs, alterDbDesc), conf);
+          taskChainTail.addDependentTask(updateReplIdTask);
+          taskChainTail = updateReplIdTask;
+        }
         rootTasks.add(evTaskRoot);
       }
 
@@ -864,23 +946,25 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
   private List<Task<? extends Serializable>> analyzeEventLoad(
       String dbName, String tblName, String locn,
-      Task<? extends Serializable> precursor) throws SemanticException {
-    // Currently handles only create-tbl & insert-ptn, since only those are 
dumped
-    // As we add more event types, this will expand.
-    DumpMetaData dmd = new DumpMetaData(new Path(locn));
+      Task<? extends Serializable> precursor,
+      Map<String, Long> dbsUpdated, Map<String, Long> tablesUpdated,
+      DumpMetaData dmd) throws SemanticException {
     MessageDeserializer md = MessageFactory.getInstance().getDeserializer();
     switch (dmd.getDumpType()) {
       case EVENT_CREATE_TABLE: {
-        return analyzeTableLoad(dbName, tblName, locn, precursor);
+        return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, 
tablesUpdated);
       }
       case EVENT_ADD_PARTITION: {
-        return analyzeTableLoad(dbName, tblName, locn, precursor);
+        return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, 
tablesUpdated);
       }
       case EVENT_DROP_TABLE: {
         DropTableMessage dropTableMessage = 
md.getDropTableMessage(dmd.getPayload());
+        String actualDbName = ((dbName == null) || dbName.isEmpty() ? 
dropTableMessage.getDB() : dbName);
+        String actualTblName = ((tblName == null) || tblName.isEmpty() ? 
dropTableMessage.getTable() : tblName);
         DropTableDesc dropTableDesc = new DropTableDesc(
-            dbName + "." + (tblName == null ? dropTableMessage.getTable() : 
tblName),
-            null, true, true, 
getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom())));
+            actualDbName + "." + actualTblName,
+            null, true, true,
+            
getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom())));
         Task<DDLWork> dropTableTask = TaskFactory.get(new DDLWork(inputs, 
outputs, dropTableDesc), conf);
         if (precursor != null){
           precursor.addDependentTask(dropTableTask);
@@ -888,20 +972,23 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         List<Task<? extends Serializable>> tasks = new ArrayList<Task<? 
extends Serializable>>();
         tasks.add(dropTableTask);
         LOG.debug("Added drop tbl task : {}:{}", dropTableTask.getId(), 
dropTableDesc.getTableName());
+        dbsUpdated.put(actualDbName,dmd.getEventTo());
         return tasks;
       }
       case EVENT_DROP_PARTITION: {
         try {
           DropPartitionMessage dropPartitionMessage = 
md.getDropPartitionMessage(dmd.getPayload());
+          String actualDbName = ((dbName == null) || dbName.isEmpty() ? 
dropPartitionMessage.getDB() : dbName);
+          String actualTblName = ((tblName == null) || tblName.isEmpty() ? 
dropPartitionMessage.getTable() : tblName);
           Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs;
           partSpecs =
               genPartSpecs(new Table(dropPartitionMessage.getTableObj()),
                   dropPartitionMessage.getPartitions());
           if (partSpecs.size() > 0) {
-            DropTableDesc dropPtnDesc =
-                new DropTableDesc(dbName + "."
-                    + (tblName == null ? dropPartitionMessage.getTable() : 
tblName), partSpecs, null,
-                    true, 
getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom())));
+            DropTableDesc dropPtnDesc = new DropTableDesc(
+                actualDbName + "." + actualTblName,
+                partSpecs, null, true,
+                
getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom())));
             Task<DDLWork> dropPtnTask =
                 TaskFactory.get(new DDLWork(inputs, outputs, dropPtnDesc), 
conf);
             if (precursor != null) {
@@ -911,6 +998,8 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
             tasks.add(dropPtnTask);
             LOG.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(),
                 dropPtnDesc.getTableName(), 
dropPartitionMessage.getPartitions());
+            dbsUpdated.put(actualDbName, dmd.getEventTo());
+            tablesUpdated.put(actualDbName + "." + actualTblName, 
dmd.getEventTo());
             return tasks;
           } else {
             throw new SemanticException(
@@ -926,7 +1015,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         }
       }
       case EVENT_ALTER_TABLE: {
-        return analyzeTableLoad(dbName, tblName, locn, precursor);
+        return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, 
tablesUpdated);
       }
       case EVENT_RENAME_TABLE: {
         AlterTableMessage renameTableMessage = 
md.getAlterTableMessage(dmd.getPayload());
@@ -960,6 +1049,12 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           List<Task<? extends Serializable>> tasks = new ArrayList<Task<? 
extends Serializable>>();
           tasks.add(renameTableTask);
           LOG.debug("Added rename table task : {}:{}->{}", 
renameTableTask.getId(), oldName, newName);
+          dbsUpdated.put(newDbName, dmd.getEventTo()); // oldDbName and 
newDbName *will* be the same if we're here
+          tablesUpdated.remove(oldName);
+          tablesUpdated.put(newName, dmd.getEventTo());
+          // Note : edge-case here in interaction with table-level REPL LOAD, 
where that nukes out tablesUpdated
+          // However, we explicitly don't support repl of that sort, and error 
out above if so. If that should
+          // ever change, this will need reworking.
           return tasks;
         } catch (Exception e) {
           if (!(e instanceof SemanticException)){
@@ -970,15 +1065,16 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         }
       }
       case EVENT_ALTER_PARTITION: {
-        return analyzeTableLoad(dbName, tblName, locn, precursor);
+        return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, 
tablesUpdated);
       }
       case EVENT_RENAME_PARTITION: {
         AlterPartitionMessage renamePtnMessage = 
md.getAlterPartitionMessage(dmd.getPayload());
+        String actualDbName = ((dbName == null) || dbName.isEmpty() ? 
renamePtnMessage.getDB() : dbName);
+        String actualTblName = ((tblName == null) || tblName.isEmpty() ? 
renamePtnMessage.getTable() : tblName);
 
         Map<String, String> newPartSpec = new LinkedHashMap<String,String>();
         Map<String, String> oldPartSpec = new LinkedHashMap<String,String>();
-        String tableName = dbName + "." +
-            ((tblName == null || tblName.isEmpty()) ? 
renamePtnMessage.getTable() : tblName);
+        String tableName = actualDbName + "." + actualTblName;
         try {
           org.apache.hadoop.hive.metastore.api.Table tblObj = 
renamePtnMessage.getTableObj();
           org.apache.hadoop.hive.metastore.api.Partition pobjBefore = 
renamePtnMessage.getPtnObjBefore();
@@ -1005,13 +1101,16 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         List<Task<? extends Serializable>> tasks = new ArrayList<Task<? 
extends Serializable>>();
         tasks.add(renamePtnTask);
         LOG.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), 
oldPartSpec, newPartSpec);
+        dbsUpdated.put(actualDbName, dmd.getEventTo());
+        tablesUpdated.put(tableName, dmd.getEventTo());
         return tasks;
       }
       case EVENT_INSERT: {
         md = MessageFactory.getInstance().getDeserializer();
         InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload());
         // Piggybacking in Import logic for now
-        return analyzeTableLoad(insertMessage.getDB(), 
insertMessage.getTable(), locn, precursor);
+        return analyzeTableLoad(
+            insertMessage.getDB(), insertMessage.getTable(), locn, precursor, 
dbsUpdated, tablesUpdated);
       }
       case EVENT_UNKNOWN: {
         break;
@@ -1108,7 +1207,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
       for (FileStatus tableDir : dirsInDbPath) {
         analyzeTableLoad(
-            dbName, null, tableDir.getPath().toUri().toString(), createDbTask);
+            dbName, null, tableDir.getPath().toUri().toString(), createDbTask, 
null, null);
       }
     } catch (Exception e) {
       throw new SemanticException(e);
@@ -1117,7 +1216,8 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
   private List<Task<? extends Serializable>> analyzeTableLoad(
       String dbName, String tblName, String locn,
-      Task<? extends Serializable> precursor) throws SemanticException {
+      Task<? extends Serializable> precursor,
+      Map<String,Long> dbsUpdated, Map<String,Long> tablesUpdated) throws 
SemanticException {
     // Path being passed to us is a table dump location. We go ahead and load 
it in as needed.
     // If tblName is null, then we default to the table name specified in 
_metadata, which is good.
     // or are both specified, in which case, that's what we are intended to 
create the new table as.
@@ -1141,7 +1241,8 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, 
outputs, importTasks, LOG,
               ctx);
       ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, 
isPartSpecSet,
-          (precursor != null), parsedLocation, tblName, dbName, 
parsedPartSpec, locn, x);
+          (precursor != null), parsedLocation, tblName, dbName, 
parsedPartSpec, locn, x,
+          dbsUpdated, tablesUpdated);
 
       if (precursor != null) {
         for (Task<? extends Serializable> t : importTasks) {

http://git-wip-us.apache.org/repos/asf/hive/blob/7d2fb003/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 402d96f..be17ffa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils;
 
 import javax.annotation.Nullable;
 import java.text.Collator;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -70,6 +71,34 @@ public class ReplicationSpec {
   static private Collator collator = Collator.getInstance();
 
   /**
+   * Class that extends HashMap with a slightly different put semantic, where
+   * put behaves as follows:
+   *  a) If the key does not already exist, then retains existing HashMap.put 
behaviour
+   *  b) If the map already contains an entry for the given key, then will 
replace only
+   *     if the new value is "greater" than the old value.
+   *
+   * The primary goal for this is to track repl updates for dbs and tables, to 
replace state
+   * only if the state is newer.
+   */
+  public static class ReplStateMap<K,V extends Comparable> extends 
HashMap<K,V> {
+    @Override
+    public V put(K k, V v){
+      if (!containsKey(k)){
+        return super.put(k,v);
+      }
+      V oldValue = get(k);
+      if (v.compareTo(oldValue) > 0){
+        return super.put(k,v);
+      }
+      // we did no replacement, but return the old value anyway. This
+      // seems most consistent with HashMap behaviour, becuse the "put"
+      // was effectively processed and consumed, although we threw away
+      // the enw value.
+      return oldValue;
+    }
+  }
+
+  /**
    * Constructor to construct spec based on either the ASTNode that
    * corresponds to the replication clause itself, or corresponds to
    * the parent node, and will scan through the children to instantiate

Reply via email to