HIVE-16171 : Support replication of truncate table (Sankar Hariappan, reviewed by Sushanth Sowmyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bbf5ecce Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bbf5ecce Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bbf5ecce Branch: refs/heads/master Commit: bbf5ecceed3d869ed83d28b001de4e2c2cc8fb7b Parents: fefeb2a Author: Sushanth Sowmyan <khorg...@gmail.com> Authored: Thu Apr 27 21:19:38 2017 -0700 Committer: Sushanth Sowmyan <khorg...@gmail.com> Committed: Thu Apr 27 21:26:04 2017 -0700 ---------------------------------------------------------------------- .../listener/DbNotificationListener.java | 4 +- .../hive/ql/TestReplicationScenarios.java | 210 ++ metastore/if/hive_metastore.thrift | 2 + .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 2195 +++++++----- .../gen/thrift/gen-cpp/ThriftHiveMetastore.h | 139 + .../ThriftHiveMetastore_server.skeleton.cpp | 5 + .../hive/metastore/api/ThriftHiveMetastore.java | 3116 ++++++++++++------ .../gen-php/metastore/ThriftHiveMetastore.php | 1350 +++++--- .../hive_metastore/ThriftHiveMetastore-remote | 7 + .../hive_metastore/ThriftHiveMetastore.py | 948 ++++-- .../gen/thrift/gen-rb/thrift_hive_metastore.rb | 62 + .../hadoop/hive/metastore/HiveAlterHandler.java | 10 +- .../hadoop/hive/metastore/HiveMetaStore.java | 153 +- .../hive/metastore/HiveMetaStoreClient.java | 17 + .../hadoop/hive/metastore/IMetaStoreClient.java | 14 + .../hive/metastore/MetaStoreEventListener.java | 12 +- .../metastore/events/AlterPartitionEvent.java | 14 +- .../hive/metastore/events/AlterTableEvent.java | 12 +- .../messaging/AlterPartitionMessage.java | 2 + .../metastore/messaging/AlterTableMessage.java | 2 + .../metastore/messaging/MessageFactory.java | 6 +- .../json/JSONAlterPartitionMessage.java | 9 +- .../messaging/json/JSONAlterTableMessage.java | 9 +- .../messaging/json/JSONMessageFactory.java | 13 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 78 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 21 + .../hive/ql/parse/ImportSemanticAnalyzer.java | 2 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 55 + .../repl/events/AlterPartitionHandler.java | 13 +- .../ql/parse/repl/events/AlterTableHandler.java | 21 +- .../columnStatsUpdateForStatsOptimizer_2.q.out | 8 +- 31 files changed, 5629 insertions(+), 2880 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/bbf5ecce/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index df423f0..6f96e1d 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -166,7 +166,7 @@ public class DbNotificationListener extends MetaStoreEventListener { Table after = tableEvent.getNewTable(); NotificationEvent event = new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory - .buildAlterTableMessage(before, after).toString()); + .buildAlterTableMessage(before, after, tableEvent.getIsTruncateOp()).toString()); event.setDbName(after.getDbName()); event.setTableName(after.getTableName()); process(event, tableEvent); @@ -305,7 +305,7 @@ public class DbNotificationListener extends MetaStoreEventListener { Partition after = partitionEvent.getNewPartition(); NotificationEvent event = new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory - .buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString()); + .buildAlterPartitionMessage(partitionEvent.getTable(), before, after, partitionEvent.getIsTruncateOp()).toString()); event.setDbName(before.getDbName()); event.setTableName(before.getTableName()); process(event, partitionEvent); http://git-wip-us.apache.org/repos/asf/hive/blob/bbf5ecce/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index ec238d2..9b8563b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -1241,6 +1241,216 @@ public class TestReplicationScenarios { } @Test + public void testTruncateTable() throws IOException { + String testName = "truncateTable"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + String[] unptn_data = new String[] { "eleven", "twelve" }; + String[] empty = new String[] {}; + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data); + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data); + + run("TRUNCATE TABLE " + dbName + ".unptned"); + verifySetup("SELECT a from " + dbName + ".unptned", empty); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + ".unptned", empty); + verifyRun("SELECT a from " + dbName + "_dupe.unptned", empty); + + String[] unptn_data_after_ins = new String[] { "thirteen" }; + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_after_ins[0] + "')"); + verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_after_ins); + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_after_ins); + } + + @Test + public void testTruncatePartitionedTable() throws IOException { + String testName = "truncatePartitionedTable"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".ptned_1(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned_2(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + + String[] ptn_data_1 = new String[] { "fifteen", "fourteen", "thirteen" }; + String[] ptn_data_2 = new String[] { "fifteen", "seventeen", "sixteen" }; + String[] empty = new String[] {}; + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=1) values('" + ptn_data_1[2] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_1 PARTITION(b=2) values('" + ptn_data_2[2] + "')"); + + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=10) values('" + ptn_data_1[2] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[1] + "')"); + run("INSERT INTO TABLE " + dbName + ".ptned_2 PARTITION(b=20) values('" + ptn_data_2[2] + "')"); + + verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + ".ptned_1 where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=10) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + ".ptned_2 where (b=20) ORDER BY a", ptn_data_2); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2) ORDER BY a", ptn_data_2); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10) ORDER BY a", ptn_data_1); + verifyRun("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20) ORDER BY a", ptn_data_2); + + run("TRUNCATE TABLE " + dbName + ".ptned_1 PARTITION(b=2)"); + verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=1) ORDER BY a", ptn_data_1); + verifySetup("SELECT a from " + dbName + ".ptned_1 where (b=2)", empty); + + run("TRUNCATE TABLE " + dbName + ".ptned_2"); + verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=10)", empty); + verifySetup("SELECT a from " + dbName + ".ptned_2 where (b=20)", empty); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=1) ORDER BY a", ptn_data_1); + verifySetup("SELECT a from " + dbName + "_dupe.ptned_1 where (b=2)", empty); + verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=10)", empty); + verifySetup("SELECT a from " + dbName + "_dupe.ptned_2 where (b=20)", empty); + } + + @Test + public void testTruncateWithCM() throws IOException { + String testName = "truncateWithCM"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + + String[] empty = new String[] {}; + String[] unptn_data = new String[] { "eleven", "thirteen" }; + String[] unptn_data_load1 = new String[] { "eleven" }; + String[] unptn_data_load2 = new String[] { "eleven", "thirteen" }; + + // 3 events to insert, last repl ID: replDumpId+3 + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); + // 3 events to insert, last repl ID: replDumpId+6 + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data); + // 1 event to truncate, last repl ID: replDumpId+8 + run("TRUNCATE TABLE " + dbName + ".unptned"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", empty); + // 3 events to insert, last repl ID: replDumpId+11 + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_load1[0] + "')"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1); + + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + // Dump and load only first insert (1 record) + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 3"); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1); + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1); + + // Dump and load only second insert (2 records) + advanceDumpDir(); + Integer lastReplID = Integer.valueOf(replDumpId); + lastReplID += 1000; + String toReplID = String.valueOf(lastReplID); + + run("REPL DUMP " + dbName + " FROM " + replDumpId + " TO " + toReplID + " LIMIT 3"); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load2); + + // Dump and load only truncate (0 records) + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId + " LIMIT 2"); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", empty); + + // Dump and load insert after truncate (1 record) + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + + verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1); + } + + @Test public void testStatus() throws IOException { // first test ReplStateMap functionality Map<String,Long> cmap = new ReplStateMap<String,Long>(); http://git-wip-us.apache.org/repos/asf/hive/blob/bbf5ecce/metastore/if/hive_metastore.thrift ---------------------------------------------------------------------- diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift index a1bdc30..ff66836 100755 --- a/metastore/if/hive_metastore.thrift +++ b/metastore/if/hive_metastore.thrift @@ -1072,6 +1072,8 @@ service ThriftHiveMetastore extends fb303.FacebookService void drop_table_with_environment_context(1:string dbname, 2:string name, 3:bool deleteData, 4:EnvironmentContext environment_context) throws(1:NoSuchObjectException o1, 2:MetaException o3) + void truncate_table(1:string dbName, 2:string tableName, 3:list<string> partNames) + throws(1:MetaException o1) list<string> get_tables(1: string db_name, 2: string pattern) throws (1: MetaException o1) list<string> get_tables_by_type(1: string db_name, 2: string pattern, 3: string tableType) throws (1: MetaException o1) list<TableMeta> get_table_meta(1: string db_patterns, 2: string tbl_patterns, 3: list<string> tbl_types)