Repository: hive Updated Branches: refs/heads/master ec82b84f3 -> 7d2fb003a
HIVE-15534 : Update db/table repl.last.id at the end of REPL LOAD of a batch of events (Sushanth Sowmyan, reviewed by Daniel Dai) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7d2fb003 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7d2fb003 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7d2fb003 Branch: refs/heads/master Commit: 7d2fb003adc23601f6fb3fc2392d8283c9005a0b Parents: ec82b84 Author: Sushanth Sowmyan <khorg...@gmail.com> Authored: Tue Jan 17 14:01:42 2017 -0800 Committer: Sushanth Sowmyan <khorg...@gmail.com> Committed: Tue Jan 17 14:03:19 2017 -0800 ---------------------------------------------------------------------- .../hive/ql/TestReplicationScenarios.java | 130 +++++++++++++-- .../hive/ql/parse/ImportSemanticAnalyzer.java | 17 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 159 +++++++++++++++---- .../hadoop/hive/ql/parse/ReplicationSpec.java | 29 ++++ 4 files changed, 295 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7d2fb003/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 778c13a..76e8f6c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.Shell; @@ -44,6 +45,7 @@ import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static junit.framework.Assert.assertTrue; import static org.junit.Assert.assertEquals; @@ -180,8 +182,7 @@ public class TestReplicationScenarios { printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - run("REPL STATUS " + dbName + "_dupe"); - verifyResults(new String[] {replDumpId}); + verifyRun("REPL STATUS " + dbName + "_dupe", replDumpId); verifyRun("SELECT * from " + dbName + "_dupe.unptned", unptn_data); verifyRun("SELECT a from " + dbName + "_dupe.ptned WHERE b=1", ptn_data_1); @@ -256,11 +257,7 @@ public class TestReplicationScenarios { run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'"); run("REPL STATUS " + dbName + "_dupe"); -// verifyResults(new String[] {incrementalDumpId}); - // TODO: this will currently not work because we need to add in ALTER_DB support into this - // and queue in a dummy ALTER_DB to update the repl.last.id on the last event of every - // incremental dump. Currently, the dump id fetched will be the last dump id at the time - // the db was created from the bootstrap export dump + verifyResults(new String[] {incrementalDumpId}); // VERIFY tables and partitions on destination for equivalence. @@ -332,7 +329,7 @@ public class TestReplicationScenarios { run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); - verifySetup("REPL STATUS " + dbName + "_dupe", new String[] {replDumpId}); + verifySetup("REPL STATUS " + dbName + "_dupe", new String[]{replDumpId}); verifySetup("SELECT * from " + dbName + "_dupe.unptned", unptn_data); verifySetup("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'", ptn_data_1); @@ -351,7 +348,7 @@ public class TestReplicationScenarios { verifySetup("SELECT a from " + dbName + ".ptned WHERE b='2'", empty); verifySetup("SELECT a from " + dbName + ".ptned", ptn_data_1); verifySetup("SELECT a from " + dbName + ".ptned3 WHERE b=1",empty); - verifySetup("SELECT a from " + dbName + ".ptned3",ptn_data_2); + verifySetup("SELECT a from " + dbName + ".ptned3", ptn_data_2); // replicate the incremental drops @@ -663,6 +660,117 @@ public class TestReplicationScenarios { verifyRun("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2", ptn_data_2); } + @Test + public void testStatus() throws IOException { + // first test ReplStateMap functionality + Map<String,Long> cmap = new ReplStateMap<String,Long>(); + + Long oldV; + oldV = cmap.put("a",1L); + assertEquals(1L,cmap.get("a").longValue()); + assertEquals(null,oldV); + + cmap.put("b",2L); + oldV = cmap.put("b",-2L); + assertEquals(2L, cmap.get("b").longValue()); + assertEquals(2L, oldV.longValue()); + + cmap.put("c",3L); + oldV = cmap.put("c",33L); + assertEquals(33L, cmap.get("c").longValue()); + assertEquals(3L, oldV.longValue()); + + // Now, to actually testing status - first, we bootstrap. + + String testName = "incrementalStatus"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + advanceDumpDir(); + run("REPL DUMP " + dbName); + String lastReplDumpLocn = getResult(0, 0); + String lastReplDumpId = getResult(0, 1, true); + run("REPL LOAD " + dbName + "_dupe FROM '" + lastReplDumpLocn + "'"); + + // Bootstrap done, now on to incremental. First, we test db-level REPL LOADs. + // Both db-level and table-level repl.last.id must be updated. + + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId, + "CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId, + "ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)"); + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId, + "ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=11)"); + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId, + "ALTER TABLE " + dbName + ".ptned SET TBLPROPERTIES ('blah'='foo')"); + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned_rn", lastReplDumpId, + "ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_rn"); + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned_rn", lastReplDumpId, + "ALTER TABLE " + dbName + ".ptned_rn DROP PARTITION (b=11)"); + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, null, lastReplDumpId, + "DROP TABLE " + dbName + ".ptned_rn"); + + // DB-level REPL LOADs testing done, now moving on to table level repl loads. + // In each of these cases, the table-level repl.last.id must move forward, but the + // db-level last.repl.id must not. + + String lastTblReplDumpId = lastReplDumpId; + lastTblReplDumpId = verifyAndReturnTblReplStatus( + dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, + "CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b int) STORED AS TEXTFILE"); + lastTblReplDumpId = verifyAndReturnTblReplStatus( + dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, + "ALTER TABLE " + dbName + ".ptned2 ADD PARTITION (b=1)"); + lastTblReplDumpId = verifyAndReturnTblReplStatus( + dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, + "ALTER TABLE " + dbName + ".ptned2 PARTITION (b=1) RENAME TO PARTITION (b=11)"); + lastTblReplDumpId = verifyAndReturnTblReplStatus( + dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, + "ALTER TABLE " + dbName + ".ptned2 SET TBLPROPERTIES ('blah'='foo')"); + // Note : Not testing table rename because table rename replication is not supported for table-level repl. + String finalTblReplDumpId = verifyAndReturnTblReplStatus( + dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, + "ALTER TABLE " + dbName + ".ptned2 DROP PARTITION (b=11)"); + + assertTrue(finalTblReplDumpId.compareTo(lastTblReplDumpId) > 0); + + // TODO : currently not testing the following scenarios: + // a) Multi-db wh-level REPL LOAD - need to add that + // b) Insert into tables - quite a few cases need to be enumerated there, including dyn adds. + + } + + private String verifyAndReturnDbReplStatus(String dbName, String tblName, String prevReplDumpId, String cmd) throws IOException { + run(cmd); + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + prevReplDumpId); + String lastDumpLocn = getResult(0, 0); + String lastReplDumpId = getResult(0, 1, true); + run("REPL LOAD " + dbName + "_dupe FROM '" + lastDumpLocn + "'"); + verifyRun("REPL STATUS " + dbName + "_dupe", lastReplDumpId); + if (tblName != null){ + verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId); + } + assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0); + return lastReplDumpId; + } + + // Tests that doing a table-level REPL LOAD updates table repl.last.id, but not db-level repl.last.id + private String verifyAndReturnTblReplStatus( + String dbName, String tblName, String lastDbReplDumpId, String prevReplDumpId, String cmd) throws IOException { + run(cmd); + advanceDumpDir(); + run("REPL DUMP " + dbName + "."+ tblName + " FROM " + prevReplDumpId); + String lastDumpLocn = getResult(0, 0); + String lastReplDumpId = getResult(0, 1, true); + run("REPL LOAD " + dbName + "_dupe." + tblName + " FROM '" + lastDumpLocn + "'"); + verifyRun("REPL STATUS " + dbName + "_dupe", lastDbReplDumpId); + verifyRun("REPL STATUS " + dbName + "_dupe." + tblName, lastReplDumpId); + assertTrue(lastReplDumpId.compareTo(prevReplDumpId) > 0); + return lastReplDumpId; + } + private String getResult(int rowNum, int colNum) throws IOException { return getResult(rowNum,colNum,false); @@ -715,6 +823,10 @@ public class TestReplicationScenarios { } } + private void verifyRun(String cmd, String data) throws IOException { + verifyRun(cmd, new String[] { data }); + } + private void verifyRun(String cmd, String[] data) throws IOException { run(cmd); verifyResults(data); http://git-wip-us.apache.org/repos/asf/hive/blob/7d2fb003/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 8c5cac2..7bb48a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -25,6 +25,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -137,7 +138,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { tableExists = prepareImport( isLocationSet, isExternalSet, isPartSpecSet, waitOnPrecursor, parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(), - new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx)); + new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx), + null, null); } catch (SemanticException e) { throw e; @@ -173,7 +175,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnPrecursor, String parsedLocation, String parsedTableName, String parsedDbName, LinkedHashMap<String, String> parsedPartSpec, - String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x + String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x, + Map<String,Long> dbsUpdated, Map<String,Long> tablesUpdated ) throws IOException, MetaException, HiveException, URISyntaxException { // initialize load path @@ -201,6 +204,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { // If the parsed statement contained a db.tablename specification, prefer that. dbname = parsedDbName; } + if (dbsUpdated != null){ + dbsUpdated.put( + dbname, + Long.valueOf(replicationSpec.get(ReplicationSpec.KEY.EVENT_ID))); + } // Create table associated with the import // Executed if relevant, and used to contain all the other details about the table if not. @@ -225,6 +233,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { if ((parsedTableName!= null) && (!parsedTableName.isEmpty())){ tblDesc.setTableName(parsedTableName); } + if (tablesUpdated != null){ + tablesUpdated.put( + dbname + "." + tblDesc.getTableName(), + Long.valueOf(replicationSpec.get(ReplicationSpec.KEY.EVENT_ID))); + } List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>(); Iterable<Partition> partitions = rv.getPartitions(); http://git-wip-us.apache.org/repos/asf/hive/blob/7d2fb003/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 98cd3b3..53ea346 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; @@ -336,7 +337,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId())); // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot) } - LOG.info("Consolidation done, preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); + LOG.info( + "Consolidation done, preparing to return {},{}->{}", + dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId); dmd.write(); @@ -376,7 +379,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), eventTo); - writeOutput(Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(eventTo)), dmd.getDumpFilePath()); + writeOutput( + Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(eventTo)), + dmd.getDumpFilePath()); dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo); dmd.write(); // Set the correct last repl id to return to the user @@ -773,7 +778,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { // not an event dump, and table name pattern specified, this has to be a tbl-level dump - rootTasks.addAll(analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null)); + rootTasks.addAll(analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null, null, null)); return; } @@ -807,9 +812,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } } else { // event dump, each subdir is an individual event dump. + Arrays.sort(dirsInLoadPath); // we need to guarantee that the directory listing we got is in order of evid. + Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork(), conf); Task<? extends Serializable> taskChainTail = evTaskRoot; + int evstage = 0; + Long lastEvid = null; + Map<String,Long> dbsUpdated = new ReplicationSpec.ReplStateMap<String,Long>(); + Map<String,Long> tablesUpdated = new ReplicationSpec.ReplStateMap<String,Long>(); + for (FileStatus dir : dirsInLoadPath){ LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); // event loads will behave similar to table loads, with one crucial difference @@ -831,8 +843,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the // entire chain + String locn = dir.getPath().toUri().toString(); + DumpMetaData eventDmd = new DumpMetaData(new Path(locn)); List<Task<? extends Serializable>> evTasks = analyzeEventLoad( - dbNameOrPattern, tblNameOrPattern, dir.getPath().toUri().toString(), taskChainTail); + dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, + dbsUpdated, tablesUpdated, eventDmd); LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? evTasks.size() : 0); if ((evTasks != null) && (!evTasks.isEmpty())){ Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); @@ -845,13 +860,80 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); taskChainTail = barrierTask; evstage++; + lastEvid = dmd.eventTo; } } - // TODO : Over here, we need to track a Map<dbName:String,evLast:Long> for every db updated - // and update repl.last.id for each, if this is a wh-level load, and if it is a db-level load, - // then a single repl.last.id update, and if this is a tbl-lvl load which does not alter the - // table itself, we'll need to update repl.last.id for that as well. - LOG.debug("added evTaskRoot {}:{}", evTaskRoot.getClass(), evTaskRoot.getId()); + + // Now, we need to update repl.last.id for the various parent objects that were updated. + // This update logic will work differently based on what "level" REPL LOAD was run on. + // a) If this was a REPL LOAD at a table level, i.e. both dbNameOrPattern and + // tblNameOrPattern were specified, then the table is the only thing we should + // update the repl.last.id for. + // b) If this was a db-level REPL LOAD, then we should update the db, as well as any + // tables affected by partition level operations. (any table level ops will + // automatically be updated as the table gets updated. Note - renames will need + // careful handling. + // c) If this was a wh-level REPL LOAD, then we should update every db for which there + // were events occurring, as well as tables for which there were ptn-level ops + // happened. Again, renames must be taken care of. + // + // So, what we're going to do is have each event load update dbsUpdated and tablesUpdated + // accordingly, but ignore updates to tablesUpdated & dbsUpdated in the case of a + // table-level REPL LOAD, using only the table itself. In the case of a db-level REPL + // LOAD, we ignore dbsUpdated, but inject our own, and do not ignore tblsUpdated. + // And for wh-level, we do no special processing, and use all of dbsUpdated and + // tblsUpdated as-is. + + // Additional Note - although this var says "dbNameOrPattern", on REPL LOAD side, + // we do not support a pattern It can be null or empty, in which case + // we re-use the existing name from the dump, or it can be specified, + // in which case we honour it. However, having this be a pattern is an error. + // Ditto for tblNameOrPattern. + + + if (evstage > 0){ + if ((tblNameOrPattern != null) && (!tblNameOrPattern.isEmpty())){ + // if tblNameOrPattern is specified, then dbNameOrPattern will be too, and + // thus, this is a table-level REPL LOAD - only table needs updating. + // If any of the individual events logged any other dbs as having changed, + // null them out. + dbsUpdated.clear(); + tablesUpdated.clear(); + tablesUpdated.put(dbNameOrPattern + "." + tblNameOrPattern, lastEvid); + } else if ((dbNameOrPattern != null) && (!dbNameOrPattern.isEmpty())){ + // if dbNameOrPattern is specified and tblNameOrPattern isn't, this is a + // db-level update, and thus, the database needs updating. In addition. + dbsUpdated.clear(); + dbsUpdated.put(dbNameOrPattern, lastEvid); + } + } + + for (String tableName : tablesUpdated.keySet()){ + // weird - AlterTableDesc requires a HashMap to update props instead of a Map. + HashMap<String,String> mapProp = new HashMap<String,String>(); + mapProp.put( + ReplicationSpec.KEY.CURR_STATE_ID.toString(), + tablesUpdated.get(tableName).toString()); + AlterTableDesc alterTblDesc = new AlterTableDesc( + AlterTableDesc.AlterTableTypes.ADDPROPS, null, false); + alterTblDesc.setProps(mapProp); + alterTblDesc.setOldName(tableName); + Task<? extends Serializable> updateReplIdTask = TaskFactory.get( + new DDLWork(inputs, outputs, alterTblDesc), conf); + taskChainTail.addDependentTask(updateReplIdTask); + taskChainTail = updateReplIdTask; + } + for (String dbName : dbsUpdated.keySet()){ + Map<String,String> mapProp = new HashMap<String,String>(); + mapProp.put( + ReplicationSpec.KEY.CURR_STATE_ID.toString(), + dbsUpdated.get(dbName).toString()); + AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp); + Task<? extends Serializable> updateReplIdTask = TaskFactory.get( + new DDLWork(inputs, outputs, alterDbDesc), conf); + taskChainTail.addDependentTask(updateReplIdTask); + taskChainTail = updateReplIdTask; + } rootTasks.add(evTaskRoot); } @@ -864,23 +946,25 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { private List<Task<? extends Serializable>> analyzeEventLoad( String dbName, String tblName, String locn, - Task<? extends Serializable> precursor) throws SemanticException { - // Currently handles only create-tbl & insert-ptn, since only those are dumped - // As we add more event types, this will expand. - DumpMetaData dmd = new DumpMetaData(new Path(locn)); + Task<? extends Serializable> precursor, + Map<String, Long> dbsUpdated, Map<String, Long> tablesUpdated, + DumpMetaData dmd) throws SemanticException { MessageDeserializer md = MessageFactory.getInstance().getDeserializer(); switch (dmd.getDumpType()) { case EVENT_CREATE_TABLE: { - return analyzeTableLoad(dbName, tblName, locn, precursor); + return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); } case EVENT_ADD_PARTITION: { - return analyzeTableLoad(dbName, tblName, locn, precursor); + return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); } case EVENT_DROP_TABLE: { DropTableMessage dropTableMessage = md.getDropTableMessage(dmd.getPayload()); + String actualDbName = ((dbName == null) || dbName.isEmpty() ? dropTableMessage.getDB() : dbName); + String actualTblName = ((tblName == null) || tblName.isEmpty() ? dropTableMessage.getTable() : tblName); DropTableDesc dropTableDesc = new DropTableDesc( - dbName + "." + (tblName == null ? dropTableMessage.getTable() : tblName), - null, true, true, getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom()))); + actualDbName + "." + actualTblName, + null, true, true, + getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom()))); Task<DDLWork> dropTableTask = TaskFactory.get(new DDLWork(inputs, outputs, dropTableDesc), conf); if (precursor != null){ precursor.addDependentTask(dropTableTask); @@ -888,20 +972,23 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); tasks.add(dropTableTask); LOG.debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName()); + dbsUpdated.put(actualDbName,dmd.getEventTo()); return tasks; } case EVENT_DROP_PARTITION: { try { DropPartitionMessage dropPartitionMessage = md.getDropPartitionMessage(dmd.getPayload()); + String actualDbName = ((dbName == null) || dbName.isEmpty() ? dropPartitionMessage.getDB() : dbName); + String actualTblName = ((tblName == null) || tblName.isEmpty() ? dropPartitionMessage.getTable() : tblName); Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs; partSpecs = genPartSpecs(new Table(dropPartitionMessage.getTableObj()), dropPartitionMessage.getPartitions()); if (partSpecs.size() > 0) { - DropTableDesc dropPtnDesc = - new DropTableDesc(dbName + "." - + (tblName == null ? dropPartitionMessage.getTable() : tblName), partSpecs, null, - true, getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom()))); + DropTableDesc dropPtnDesc = new DropTableDesc( + actualDbName + "." + actualTblName, + partSpecs, null, true, + getNewEventOnlyReplicationSpec(String.valueOf(dmd.getEventFrom()))); Task<DDLWork> dropPtnTask = TaskFactory.get(new DDLWork(inputs, outputs, dropPtnDesc), conf); if (precursor != null) { @@ -911,6 +998,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { tasks.add(dropPtnTask); LOG.debug("Added drop ptn task : {}:{},{}", dropPtnTask.getId(), dropPtnDesc.getTableName(), dropPartitionMessage.getPartitions()); + dbsUpdated.put(actualDbName, dmd.getEventTo()); + tablesUpdated.put(actualDbName + "." + actualTblName, dmd.getEventTo()); return tasks; } else { throw new SemanticException( @@ -926,7 +1015,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } } case EVENT_ALTER_TABLE: { - return analyzeTableLoad(dbName, tblName, locn, precursor); + return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); } case EVENT_RENAME_TABLE: { AlterTableMessage renameTableMessage = md.getAlterTableMessage(dmd.getPayload()); @@ -960,6 +1049,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); tasks.add(renameTableTask); LOG.debug("Added rename table task : {}:{}->{}", renameTableTask.getId(), oldName, newName); + dbsUpdated.put(newDbName, dmd.getEventTo()); // oldDbName and newDbName *will* be the same if we're here + tablesUpdated.remove(oldName); + tablesUpdated.put(newName, dmd.getEventTo()); + // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out tablesUpdated + // However, we explicitly don't support repl of that sort, and error out above if so. If that should + // ever change, this will need reworking. return tasks; } catch (Exception e) { if (!(e instanceof SemanticException)){ @@ -970,15 +1065,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } } case EVENT_ALTER_PARTITION: { - return analyzeTableLoad(dbName, tblName, locn, precursor); + return analyzeTableLoad(dbName, tblName, locn, precursor, dbsUpdated, tablesUpdated); } case EVENT_RENAME_PARTITION: { AlterPartitionMessage renamePtnMessage = md.getAlterPartitionMessage(dmd.getPayload()); + String actualDbName = ((dbName == null) || dbName.isEmpty() ? renamePtnMessage.getDB() : dbName); + String actualTblName = ((tblName == null) || tblName.isEmpty() ? renamePtnMessage.getTable() : tblName); Map<String, String> newPartSpec = new LinkedHashMap<String,String>(); Map<String, String> oldPartSpec = new LinkedHashMap<String,String>(); - String tableName = dbName + "." + - ((tblName == null || tblName.isEmpty()) ? renamePtnMessage.getTable() : tblName); + String tableName = actualDbName + "." + actualTblName; try { org.apache.hadoop.hive.metastore.api.Table tblObj = renamePtnMessage.getTableObj(); org.apache.hadoop.hive.metastore.api.Partition pobjBefore = renamePtnMessage.getPtnObjBefore(); @@ -1005,13 +1101,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); tasks.add(renamePtnTask); LOG.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, newPartSpec); + dbsUpdated.put(actualDbName, dmd.getEventTo()); + tablesUpdated.put(tableName, dmd.getEventTo()); return tasks; } case EVENT_INSERT: { md = MessageFactory.getInstance().getDeserializer(); InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload()); // Piggybacking in Import logic for now - return analyzeTableLoad(insertMessage.getDB(), insertMessage.getTable(), locn, precursor); + return analyzeTableLoad( + insertMessage.getDB(), insertMessage.getTable(), locn, precursor, dbsUpdated, tablesUpdated); } case EVENT_UNKNOWN: { break; @@ -1108,7 +1207,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { for (FileStatus tableDir : dirsInDbPath) { analyzeTableLoad( - dbName, null, tableDir.getPath().toUri().toString(), createDbTask); + dbName, null, tableDir.getPath().toUri().toString(), createDbTask, null, null); } } catch (Exception e) { throw new SemanticException(e); @@ -1117,7 +1216,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { private List<Task<? extends Serializable>> analyzeTableLoad( String dbName, String tblName, String locn, - Task<? extends Serializable> precursor) throws SemanticException { + Task<? extends Serializable> precursor, + Map<String,Long> dbsUpdated, Map<String,Long> tablesUpdated) throws SemanticException { // Path being passed to us is a table dump location. We go ahead and load it in as needed. // If tblName is null, then we default to the table name specified in _metadata, which is good. // or are both specified, in which case, that's what we are intended to create the new table as. @@ -1141,7 +1241,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, importTasks, LOG, ctx); ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet, - (precursor != null), parsedLocation, tblName, dbName, parsedPartSpec, locn, x); + (precursor != null), parsedLocation, tblName, dbName, parsedPartSpec, locn, x, + dbsUpdated, tablesUpdated); if (precursor != null) { for (Task<? extends Serializable> t : importTasks) { http://git-wip-us.apache.org/repos/asf/hive/blob/7d2fb003/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 402d96f..be17ffa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import javax.annotation.Nullable; import java.text.Collator; +import java.util.HashMap; import java.util.Map; /** @@ -70,6 +71,34 @@ public class ReplicationSpec { static private Collator collator = Collator.getInstance(); /** + * Class that extends HashMap with a slightly different put semantic, where + * put behaves as follows: + * a) If the key does not already exist, then retains existing HashMap.put behaviour + * b) If the map already contains an entry for the given key, then will replace only + * if the new value is "greater" than the old value. + * + * The primary goal for this is to track repl updates for dbs and tables, to replace state + * only if the state is newer. + */ + public static class ReplStateMap<K,V extends Comparable> extends HashMap<K,V> { + @Override + public V put(K k, V v){ + if (!containsKey(k)){ + return super.put(k,v); + } + V oldValue = get(k); + if (v.compareTo(oldValue) > 0){ + return super.put(k,v); + } + // we did no replacement, but return the old value anyway. This + // seems most consistent with HashMap behaviour, becuse the "put" + // was effectively processed and consumed, although we threw away + // the enw value. + return oldValue; + } + } + + /** * Constructor to construct spec based on either the ASTNode that * corresponds to the replication clause itself, or corresponds to * the parent node, and will scan through the children to instantiate