HIVE-19267: Replicate ACID/MM tables write operations (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f519db7e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f519db7e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f519db7e Branch: refs/heads/master-txnstats Commit: f519db7eafacb4b4d2d9fe2a9e10e908d8077224 Parents: 285a9b4 Author: Sankar Hariappan <sank...@apache.org> Authored: Tue Jul 3 15:32:05 2018 +0530 Committer: Sankar Hariappan <sank...@apache.org> Committed: Tue Jul 3 15:32:05 2018 +0530 ---------------------------------------------------------------------- .../listener/DbNotificationListener.java | 209 +- .../listener/DummyRawStoreFailEvent.java | 15 + .../listener/TestDbNotificationListener.java | 5 + .../hive/ql/parse/TestReplicationScenarios.java | 72 - .../TestReplicationScenariosAcidTables.java | 602 ++- ...TestReplicationScenariosAcrossInstances.java | 15 +- .../hadoop/hive/ql/parse/WarehouseInstance.java | 5 + .../metastore/SynchronizedMetaStoreClient.java | 5 + .../apache/hadoop/hive/ql/exec/MoveTask.java | 6 +- .../hadoop/hive/ql/exec/ReplCopyTask.java | 5 +- .../apache/hadoop/hive/ql/exec/ReplTxnTask.java | 31 +- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 4 +- .../IncrementalLoadTasksBuilder.java | 73 +- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 37 +- .../hadoop/hive/ql/io/HiveInputFormat.java | 24 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 18 +- .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 7 +- .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 10 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 167 +- .../hadoop/hive/ql/metadata/HiveUtils.java | 11 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 83 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 8 +- .../ql/parse/UpdateDeleteSemanticAnalyzer.java | 16 +- .../hadoop/hive/ql/parse/repl/CopyUtils.java | 2 +- .../hadoop/hive/ql/parse/repl/dump/Utils.java | 4 - .../repl/dump/events/CommitTxnHandler.java | 125 +- .../ql/parse/repl/dump/events/EventHandler.java | 23 +- .../parse/repl/dump/events/InsertHandler.java | 4 + .../parse/repl/load/UpdatedMetaDataTracker.java | 124 +- .../repl/load/message/AbortTxnHandler.java | 7 +- .../repl/load/message/AllocWriteIdHandler.java | 2 +- .../repl/load/message/CommitTxnHandler.java | 78 +- .../parse/repl/load/message/MessageHandler.java | 8 +- .../parse/repl/load/message/OpenTxnHandler.java | 7 +- .../apache/hadoop/hive/ql/plan/MoveWork.java | 12 +- .../apache/hadoop/hive/ql/plan/ReplTxnWork.java | 15 + .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 2675 +++++++----- .../gen/thrift/gen-cpp/ThriftHiveMetastore.h | 126 + .../ThriftHiveMetastore_server.skeleton.cpp | 5 + .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 3905 ++++++++++-------- .../gen/thrift/gen-cpp/hive_metastore_types.h | 218 +- .../metastore/api/AddDynamicPartitions.java | 32 +- .../api/AllocateTableWriteIdsRequest.java | 68 +- .../api/AllocateTableWriteIdsResponse.java | 36 +- .../metastore/api/ClearFileMetadataRequest.java | 32 +- .../hive/metastore/api/ClientCapabilities.java | 32 +- .../hive/metastore/api/CommitTxnRequest.java | 168 +- .../hive/metastore/api/CompactionRequest.java | 44 +- .../hive/metastore/api/CreationMetadata.java | 32 +- .../metastore/api/FindSchemasByColsResp.java | 36 +- .../hive/metastore/api/FireEventRequest.java | 32 +- .../metastore/api/GetAllFunctionsResponse.java | 36 +- .../api/GetFileMetadataByExprRequest.java | 32 +- .../api/GetFileMetadataByExprResult.java | 48 +- .../metastore/api/GetFileMetadataRequest.java | 32 +- .../metastore/api/GetFileMetadataResult.java | 44 +- .../hive/metastore/api/GetTablesRequest.java | 32 +- .../hive/metastore/api/GetTablesResult.java | 36 +- .../metastore/api/GetValidWriteIdsRequest.java | 32 +- .../metastore/api/GetValidWriteIdsResponse.java | 36 +- .../api/HeartbeatTxnRangeResponse.java | 64 +- .../metastore/api/InsertEventRequestData.java | 227 +- .../hadoop/hive/metastore/api/LockRequest.java | 36 +- .../hive/metastore/api/Materialization.java | 32 +- .../api/NotificationEventResponse.java | 36 +- .../metastore/api/PutFileMetadataRequest.java | 64 +- .../api/ReplTblWriteIdStateRequest.java | 32 +- .../hive/metastore/api/SchemaVersion.java | 36 +- .../hive/metastore/api/ShowCompactResponse.java | 36 +- .../hive/metastore/api/ShowLocksResponse.java | 36 +- .../hive/metastore/api/TableValidWriteIds.java | 32 +- .../hive/metastore/api/ThriftHiveMetastore.java | 3468 ++++++++++------ .../hive/metastore/api/WMFullResourcePlan.java | 144 +- .../api/WMGetAllResourcePlanResponse.java | 36 +- .../WMGetTriggersForResourePlanResponse.java | 36 +- .../api/WMValidateResourcePlanResponse.java | 64 +- .../hive/metastore/api/WriteEventInfo.java | 1012 +++++ .../api/WriteNotificationLogRequest.java | 949 +++++ .../api/WriteNotificationLogResponse.java | 283 ++ .../gen-php/metastore/ThriftHiveMetastore.php | 1630 ++++---- .../src/gen/thrift/gen-php/metastore/Types.php | 1630 +++++--- .../hive_metastore/ThriftHiveMetastore-remote | 7 + .../hive_metastore/ThriftHiveMetastore.py | 1139 ++--- .../gen/thrift/gen-py/hive_metastore/ttypes.py | 933 +++-- .../gen/thrift/gen-rb/hive_metastore_types.rb | 86 +- .../gen/thrift/gen-rb/thrift_hive_metastore.rb | 54 + .../hadoop/hive/metastore/HiveMetaStore.java | 86 + .../hive/metastore/HiveMetaStoreClient.java | 10 +- .../hadoop/hive/metastore/IMetaStoreClient.java | 16 +- .../hive/metastore/MetaStoreEventListener.java | 12 + .../metastore/MetaStoreListenerNotifier.java | 6 + .../hadoop/hive/metastore/ObjectStore.java | 60 + .../apache/hadoop/hive/metastore/RawStore.java | 14 + .../hive/metastore/ReplChangeManager.java | 10 +- .../hive/metastore/cache/CachedStore.java | 12 + .../hive/metastore/events/AcidWriteEvent.java | 91 + .../metastore/messaging/AcidWriteMessage.java | 50 + .../metastore/messaging/CommitTxnMessage.java | 23 + .../hive/metastore/messaging/EventMessage.java | 3 +- .../messaging/MessageDeserializer.java | 9 + .../metastore/messaging/MessageFactory.java | 12 + .../messaging/json/JSONAcidWriteMessage.java | 150 + .../messaging/json/JSONCommitTxnMessage.java | 95 + .../messaging/json/JSONMessageDeserializer.java | 9 + .../messaging/json/JSONMessageFactory.java | 8 + .../model/MTxnWriteNotificationLog.java | 123 + .../hive/metastore/tools/SQLGenerator.java | 9 + .../hadoop/hive/metastore/txn/TxnDbUtil.java | 28 + .../hadoop/hive/metastore/txn/TxnHandler.java | 187 +- .../hadoop/hive/metastore/txn/TxnStore.java | 11 + .../hadoop/hive/metastore/utils/FileUtils.java | 12 +- .../src/main/resources/package.jdo | 35 + .../main/sql/derby/hive-schema-3.1.0.derby.sql | 15 + .../main/sql/derby/hive-schema-4.0.0.derby.sql | 15 + .../sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql | 1 - .../sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql | 16 + .../main/sql/mssql/hive-schema-3.1.0.mssql.sql | 17 + .../main/sql/mssql/hive-schema-4.0.0.mssql.sql | 17 + .../sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql | 16 + .../main/sql/mysql/hive-schema-3.0.0.mysql.sql | 1 - .../main/sql/mysql/hive-schema-3.1.0.mysql.sql | 16 + .../main/sql/mysql/hive-schema-4.0.0.mysql.sql | 16 + .../sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql | 4 +- .../sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql | 16 + .../sql/oracle/hive-schema-3.0.0.oracle.sql | 1 - .../sql/oracle/hive-schema-3.1.0.oracle.sql | 15 + .../sql/oracle/hive-schema-4.0.0.oracle.sql | 15 + .../oracle/upgrade-2.3.0-to-3.0.0.oracle.sql | 4 +- .../oracle/upgrade-3.0.0-to-3.1.0.oracle.sql | 16 + .../sql/postgres/hive-schema-3.0.0.postgres.sql | 2 - .../sql/postgres/hive-schema-3.1.0.postgres.sql | 15 + .../sql/postgres/hive-schema-4.0.0.postgres.sql | 15 + .../upgrade-3.0.0-to-3.1.0.postgres.sql | 16 + .../src/main/thrift/hive_metastore.thrift | 30 +- .../DummyRawStoreControlledCommit.java | 11 + .../DummyRawStoreForJdoConnection.java | 10 + .../HiveMetaStoreClientPreCatalog.java | 10 +- 137 files changed, 15896 insertions(+), 7205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 6321f9b..717cc8a 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -23,6 +23,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; @@ -75,11 +76,14 @@ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent; import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent; import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; +import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; import org.apache.hadoop.hive.metastore.tools.SQLGenerator; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -269,10 +273,16 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener public PartitionFiles next() { try { Partition p = partitionIter.next(); - List<String> files = Lists.newArrayList(new FileIterator(p.getSd().getLocation())); + Iterator<String> fileIterator; + //For transactional tables, the actual file copy will be done by acid write event during replay of commit txn. + if (!TxnUtils.isTransactionalTable(t)) { + List<String> files = Lists.newArrayList(new FileIterator(p.getSd().getLocation())); + fileIterator = files.iterator(); + } else { + fileIterator = Collections.emptyIterator(); + } PartitionFiles partitionFiles = - new PartitionFiles(Warehouse.makePartName(t.getPartitionKeys(), p.getValues()), - files.iterator()); + new PartitionFiles(Warehouse.makePartName(t.getPartitionKeys(), p.getValues()), fileIterator); return partitionFiles; } catch (MetaException e) { throw new RuntimeException(e); @@ -414,10 +424,15 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener class FileChksumIterator implements Iterator<String> { private List<String> files; private List<String> chksums; + private List<String> subDirs; int i = 0; FileChksumIterator(List<String> files, List<String> chksums) { + this(files, chksums, null); + } + FileChksumIterator(List<String> files, List<String> chksums, List<String> subDirs) { this.files = files; this.chksums = chksums; + this.subDirs = subDirs; } @Override public boolean hasNext() { @@ -428,7 +443,8 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener public String next() { String result; try { - result = ReplChangeManager.encodeFileUri(files.get(i), chksums != null ? chksums.get(i) : null, null); + result = ReplChangeManager.encodeFileUri(files.get(i), chksums != null ? chksums.get(i) : null, + subDirs != null ? subDirs.get(i) : null); } catch (IOException e) { // File operations failed LOG.error("Encoding file URI failed with error " + e.getMessage()); @@ -623,6 +639,23 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener } } + @Override + public void onAcidWrite(AcidWriteEvent acidWriteEvent, Connection dbConn, SQLGenerator sqlGenerator) + throws MetaException { + AcidWriteMessage msg = msgFactory.buildAcidWriteMessage(acidWriteEvent, + new FileChksumIterator(acidWriteEvent.getFiles(), acidWriteEvent.getChecksums(), + acidWriteEvent.getSubDirs())); + NotificationEvent event = new NotificationEvent(0, now(), EventType.ACID_WRITE.toString(), msg.toString()); + event.setMessageFormat(msgFactory.getMessageFormat()); + event.setDbName(acidWriteEvent.getDatabase()); + event.setTableName(acidWriteEvent.getTable()); + try { + addWriteNotificationLog(event, acidWriteEvent, dbConn, sqlGenerator, msg); + } catch (SQLException e) { + throw new MetaException("Unable to add write notification log " + StringUtils.stringifyException(e)); + } + } + private int now() { long millis = System.currentTimeMillis(); millis /= 1000; @@ -634,12 +667,133 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener return (int)millis; } + /** + * Close statement instance. + * @param stmt statement instance. + */ + private static void closeStmt(Statement stmt) { + try { + if (stmt != null && !stmt.isClosed()) { + stmt.close(); + } + } catch (SQLException e) { + LOG.warn("Failed to close statement " + e.getMessage()); + } + } + + /** + * Close the ResultSet. + * @param rs may be {@code null} + */ + private static void close(ResultSet rs) { + try { + if (rs != null && !rs.isClosed()) { + rs.close(); + } + } catch(SQLException ex) { + LOG.warn("Failed to close result set " + ex.getMessage()); + } + } + + private long getNextNLId(Statement stmt, SQLGenerator sqlGenerator, String sequence) + throws SQLException, MetaException { + String s = sqlGenerator.addForUpdateClause("select \"NEXT_VAL\" from " + + "\"SEQUENCE_TABLE\" where \"SEQUENCE_NAME\" = " + quoteString(sequence)); + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = null; + try { + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction database not properly configured, can't find next NL id."); + } + + long nextNLId = rs.getLong(1); + long updatedNLId = nextNLId + 1; + s = "update \"SEQUENCE_TABLE\" set \"NEXT_VAL\" = " + updatedNLId + " where \"SEQUENCE_NAME\" = " + + quoteString(sequence); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + return nextNLId; + }finally { + close(rs); + } + } + + private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent acidWriteEvent, Connection dbConn, + SQLGenerator sqlGenerator, AcidWriteMessage msg) throws MetaException, SQLException { + LOG.debug("DbNotificationListener: adding write notification log for : {}", event.getMessage()); + assert ((dbConn != null) && (sqlGenerator != null)); + + Statement stmt =null; + ResultSet rs = null; + String dbName = acidWriteEvent.getDatabase(); + String tblName = acidWriteEvent.getTable(); + String partition = acidWriteEvent.getPartition(); + String tableObj = msg.getTableObjStr(); + String partitionObj = msg.getPartitionObjStr(); + String files = ReplChangeManager.joinWithSeparator(msg.getFiles()); + + try { + stmt = dbConn.createStatement(); + if (sqlGenerator.getDbProduct() == MYSQL) { + stmt.execute("SET @@session.sql_mode=ANSI_QUOTES"); + } + + String s = sqlGenerator.addForUpdateClause("select \"WNL_FILES\", \"WNL_ID\" from" + + " \"TXN_WRITE_NOTIFICATION_LOG\" " + + "where \"WNL_DATABASE\" = " + quoteString(dbName) + + "and \"WNL_TABLE\" = " + quoteString(tblName) + " and \"WNL_PARTITION\" = " + + quoteString(partition) + " and \"WNL_TXNID\" = " + Long.toString(acidWriteEvent.getTxnId())); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + // if rs is empty then no lock is taken and thus it can not cause deadlock. + long nextNLId = getNextNLId(stmt, sqlGenerator, + "org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog"); + s = "insert into \"TXN_WRITE_NOTIFICATION_LOG\" (\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\"," + + " \"WNL_DATABASE\", \"WNL_TABLE\"," + + " \"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", \"WNL_FILES\", \"WNL_EVENT_TIME\")" + + " values (" + nextNLId + + "," + acidWriteEvent.getTxnId() + "," + acidWriteEvent.getWriteId()+ "," + + quoteString(dbName)+ "," + quoteString(tblName)+ "," + quoteString(partition)+ "," + + quoteString(tableObj)+ "," + quoteString(partitionObj) + "," + quoteString(files)+ + "," + now() + ")"; + LOG.info("Going to execute insert <" + s + ">"); + stmt.execute(sqlGenerator.addEscapeCharacters(s)); + } else { + String existingFiles = rs.getString(1); + if (existingFiles.contains(sqlGenerator.addEscapeCharacters(files))) { + // If list of files are already present then no need to update it again. This scenario can come in case of + // retry done to the meta store for the same operation. + LOG.info("file list " + files + " already present"); + return; + } + long nlId = rs.getLong(2); + files = ReplChangeManager.joinWithSeparator(Lists.newArrayList(files, existingFiles)); + s = "update \"TXN_WRITE_NOTIFICATION_LOG\" set \"WNL_TABLE_OBJ\" = " + quoteString(tableObj) + "," + + " \"WNL_PARTITION_OBJ\" = " + quoteString(partitionObj) + "," + + " \"WNL_FILES\" = " + quoteString(files) + "," + + " \"WNL_EVENT_TIME\" = " + now() + + " where \"WNL_ID\" = " + nlId; + LOG.info("Going to execute update <" + s + ">"); + stmt.executeUpdate(sqlGenerator.addEscapeCharacters(s)); + } + } catch (SQLException e) { + LOG.warn("failed to add write notification log" + e.getMessage()); + throw e; + } finally { + closeStmt(stmt); + close(rs); + } + } + static String quoteString(String input) { return "'" + input + "'"; } private void addNotificationLog(NotificationEvent event, ListenerEvent listenerEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException, SQLException { + LOG.debug("DbNotificationListener: adding notification log for : {}", event.getMessage()); if ((dbConn == null) || (sqlGenerator == null)) { LOG.info("connection or sql generator is not set so executing sql via DN"); process(event, listenerEvent); @@ -669,22 +823,8 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); - s = sqlGenerator.addForUpdateClause("select \"NEXT_VAL\" from " + - "\"SEQUENCE_TABLE\" where \"SEQUENCE_NAME\" = " + - " 'org.apache.hadoop.hive.metastore.model.MNotificationLog'"); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("failed to get next NEXT_VAL from SEQUENCE_TABLE"); - } - - long nextNLId = rs.getLong(1); - long updatedNLId = nextNLId + 1; - s = "update \"SEQUENCE_TABLE\" set \"NEXT_VAL\" = " + updatedNLId + " where \"SEQUENCE_NAME\" = " + - - " 'org.apache.hadoop.hive.metastore.model.MNotificationLog'"; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + long nextNLId = getNextNLId(stmt, sqlGenerator, + "org.apache.hadoop.hive.metastore.model.MNotificationLog"); List<String> insert = new ArrayList<>(); @@ -712,20 +852,8 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener LOG.warn("failed to add notification log" + e.getMessage()); throw e; } finally { - if (stmt != null && !stmt.isClosed()) { - try { - stmt.close(); - } catch (SQLException e) { - LOG.warn("Failed to close statement " + e.getMessage()); - } - } - if (rs != null && !rs.isClosed()) { - try { - rs.close(); - } catch (SQLException e) { - LOG.warn("Failed to close result set " + e.getMessage()); - } - } + closeStmt(stmt); + close(rs); } } @@ -742,12 +870,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener event.getMessage()); HMSHandler.getMSForConf(conf).addNotificationEvent(event); - // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners. - if (event.isSetEventId()) { - listenerEvent.putParameter( - MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME, - Long.toString(event.getEventId())); - } + // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners. + if (event.isSetEventId()) { + listenerEvent.putParameter( + MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME, + Long.toString(event.getEventId())); + } } private static class CleanerThread extends Thread { @@ -768,6 +896,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener while (true) { try { rs.cleanNotificationEvents(ttl); + rs.cleanWriteNotificationEvents(ttl); } catch (Exception ex) { //catching exceptions here makes sure that the thread doesn't die in case of unexpected //exceptions http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index abf67a8..b4b118e 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -89,6 +89,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownTableException; import org.apache.hadoop.hive.metastore.api.WMMapping; import org.apache.hadoop.hive.metastore.api.WMPool; import org.apache.hadoop.hive.metastore.api.WMNullablePool; +import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.thrift.TException; @@ -880,6 +881,20 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { } @Override + public void cleanWriteNotificationEvents(int olderThan) { + if (!shouldEventSucceed) { + //throw exception to simulate an issue with cleaner thread + throw new RuntimeException("Dummy exception while cleaning write notifications"); + } + objectStore.cleanWriteNotificationEvents(olderThan); + } + + @Override + public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException { + return objectStore.getAllWriteEventInfo(txnId, dbName, tableName); + } + + @Override public CurrentNotificationEventId getCurrentNotificationEventId() { return objectStore.getCurrentNotificationEventId(); } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index eef917e..82429e3 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent; import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent; +import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; @@ -238,6 +239,10 @@ public class TestDbNotificationListener { public void onAllocWriteId(AllocWriteIdEvent allocWriteIdEvent) throws MetaException { pushEventId(EventType.ALLOC_WRITE_ID, allocWriteIdEvent); } + + public void onAcidWrite(AcidWriteEvent acidWriteEvent) throws MetaException { + pushEventId(EventType.ACID_WRITE, acidWriteEvent); + } } @SuppressWarnings("rawtypes") http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 46c623d..c82a933 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -2833,78 +2833,6 @@ public class TestReplicationScenarios { verifyRun("SELECT max(a) from " + replDbName + ".ptned2 where b=1", new String[]{"8"}, driverMirror); } - // TODO: This test should be removed once ACID tables replication is supported. - @Test - public void testSkipTables() throws Exception { - String testName = "skipTables"; - String dbName = createDB(testName, driver); - String replDbName = dbName + "_dupe"; - - // TODO: this is wrong; this test sets up dummy txn manager and so it cannot create ACID tables. - // If I change it to use proper txn manager, the setup for some tests hangs. - // This used to work by accident, now this works due a test flag. The test needs to be fixed. - // Create table - run("CREATE TABLE " + dbName + ".acid_table (key int, value int) PARTITIONED BY (load_date date) " + - "CLUSTERED BY(key) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); - run("CREATE TABLE " + dbName + ".mm_table (key int, value int) PARTITIONED BY (load_date date) " + - "CLUSTERED BY(key) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true'," + - " 'transactional_properties'='insert_only')", driver); - verifyIfTableExist(dbName, "acid_table", metaStoreClient); - verifyIfTableExist(dbName, "mm_table", metaStoreClient); - - // Bootstrap test - Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String replDumpId = bootstrapDump.lastReplId; - verifyIfTableNotExist(replDbName, "acid_table", metaStoreClientMirror); - verifyIfTableNotExist(replDbName, "mm_table", metaStoreClientMirror); - - // Test alter table - run("ALTER TABLE " + dbName + ".acid_table RENAME TO " + dbName + ".acid_table_rename", driver); - verifyIfTableExist(dbName, "acid_table_rename", metaStoreClient); - - // Dummy create table command to mark proper last repl ID after dump - run("CREATE TABLE " + dbName + ".dummy (a int)", driver); - - // Perform REPL-DUMP/LOAD - Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; - verifyIfTableNotExist(replDbName, "acid_table_rename", metaStoreClientMirror); - - // Create another table for incremental repl verification - run("CREATE TABLE " + dbName + ".acid_table_incremental (key int, value int) PARTITIONED BY (load_date date) " + - "CLUSTERED BY(key) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); - run("CREATE TABLE " + dbName + ".mm_table_incremental (key int, value int) PARTITIONED BY (load_date date) " + - "CLUSTERED BY(key) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true'," + - " 'transactional_properties'='insert_only')", driver); - verifyIfTableExist(dbName, "acid_table_incremental", metaStoreClient); - verifyIfTableExist(dbName, "mm_table_incremental", metaStoreClient); - - // Dummy insert into command to mark proper last repl ID after dump - run("INSERT INTO " + dbName + ".dummy values(1)", driver); - - // Perform REPL-DUMP/LOAD - incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName); - replDumpId = incrementalDump.lastReplId; - verifyIfTableNotExist(replDbName, "acid_table_incremental", metaStoreClientMirror); - verifyIfTableNotExist(replDbName, "mm_table_incremental", metaStoreClientMirror); - - // Test adding a constraint - run("ALTER TABLE " + dbName + ".acid_table_incremental ADD CONSTRAINT key_pk PRIMARY KEY (key) DISABLE NOVALIDATE", driver); - try { - List<SQLPrimaryKey> pks = metaStoreClient.getPrimaryKeys(new PrimaryKeysRequest(dbName, "acid_table_incremental")); - assertEquals(pks.size(), 1); - } catch (TException te) { - assertNull(te); - } - - // Dummy insert into command to mark proper last repl ID after dump - run("INSERT INTO " + dbName + ".dummy values(2)", driver); - - // Perform REPL-DUMP/LOAD - incrementalLoadAndVerify(dbName, replDumpId, replDbName); - verifyIfTableNotExist(replDbName, "acid_table_incremental", metaStoreClientMirror); - } - @Test public void testDeleteStagingDir() throws IOException { String testName = "deleteStagingDir"; http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 86c0405..8c683cf 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.rules.TestName; + import org.junit.rules.TestRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +54,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import javax.annotation.Nullable; +import java.util.Collections; +import com.google.common.collect.Lists; /** * TestReplicationScenariosAcidTables - test replication for ACID tables @@ -66,8 +69,13 @@ public class TestReplicationScenariosAcidTables { protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); private static WarehouseInstance primary, replica, replicaNonAcid; - private String primaryDbName, replicatedDbName; private static HiveConf conf; + private String primaryDbName, replicatedDbName, primaryDbNameExtra; + private enum OperationType { + REPL_TEST_ACID_INSERT, REPL_TEST_ACID_INSERT_SELECT, REPL_TEST_ACID_CTAS, + REPL_TEST_ACID_INSERT_OVERWRITE, REPL_TEST_ACID_INSERT_IMPORT, REPL_TEST_ACID_INSERT_LOADLOCAL, + REPL_TEST_ACID_INSERT_UNION + } @BeforeClass public static void classLevelSetup() throws Exception { @@ -80,9 +88,13 @@ public class TestReplicationScenariosAcidTables { put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); put("hive.support.concurrency", "true"); put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); - put("hive.repl.dump.include.acid.tables", "true"); put("hive.metastore.client.capability.check", "false"); put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); }}; primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); @@ -90,7 +102,6 @@ public class TestReplicationScenariosAcidTables { put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); put("hive.support.concurrency", "false"); put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); - put("hive.repl.dump.include.acid.tables", "true"); put("hive.metastore.client.capability.check", "false"); }}; replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1); @@ -109,6 +120,9 @@ public class TestReplicationScenariosAcidTables { replicatedDbName = "replicated_" + primaryDbName; primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + primaryDbNameExtra = primaryDbName+"_extra"; + primary.run("create database " + primaryDbNameExtra + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); } @After @@ -116,6 +130,7 @@ public class TestReplicationScenariosAcidTables { primary.run("drop database if exists " + primaryDbName + " cascade"); replica.run("drop database if exists " + replicatedDbName + " cascade"); replicaNonAcid.run("drop database if exists " + replicatedDbName + " cascade"); + primary.run("drop database if exists " + primaryDbName + "_extra cascade"); } @Test @@ -482,4 +497,585 @@ public class TestReplicationScenariosAcidTables { primary.run("DROP TABLE " + dbName + ".normal"); primary.run("drop database " + dbName); } + + @Test + public void testAcidTableIncrementalReplication() throws Throwable { + WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId); + List<String> selectStmtList = new ArrayList<>(); + List<String[]> expectedValues = new ArrayList<>(); + + appendInsert(selectStmtList, expectedValues); + appendDelete(selectStmtList, expectedValues); + appendUpdate(selectStmtList, expectedValues); + appendTruncate(selectStmtList, expectedValues); + appendInsertIntoFromSelect(selectStmtList, expectedValues); + appendMerge(selectStmtList, expectedValues); + appendCreateAsSelect(selectStmtList, expectedValues); + appendImport(selectStmtList, expectedValues); + appendInsertOverwrite(selectStmtList, expectedValues); + //appendLoadLocal(selectStmtList, expectedValues); + appendInsertUnion(selectStmtList, expectedValues); + appendMultiStatementTxn(selectStmtList, expectedValues); + appendMultiStatementTxnUpdateDelete(selectStmtList, expectedValues); + + verifyIncrementalLoad(selectStmtList, expectedValues, bootStrapDump.lastReplicationId); + } + + private void appendInsert(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + String tableName = testName.getMethodName() + "testInsert"; + String tableNameMM = tableName + "_MM"; + insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + selectStmtList.add("select key from " + tableName + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); + selectStmtList.add("select key from " + tableNameMM + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + } + + private void appendDelete(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + String tableName = testName.getMethodName() + "testDelete"; + insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + deleteRecords(tableName); + selectStmtList.add("select count(*) from " + tableName); + expectedValues.add(new String[] {"0"}); + } + + private void appendUpdate(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + String tableName = testName.getMethodName() + "testUpdate"; + insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + updateRecords(tableName); + selectStmtList.add("select value from " + tableName + " order by value"); + expectedValues.add(new String[] {"1", "100", "100", "100", "100"}); + } + + private void appendTruncate(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + String tableName = testName.getMethodName() + "testTruncate"; + String tableNameMM = tableName + "_MM"; + + insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + truncateTable(primaryDbName, tableName); + selectStmtList.add("select count(*) from " + tableName); + expectedValues.add(new String[] {"0"}); + + insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); + truncateTable(primaryDbName, tableNameMM); + selectStmtList.add("select count(*) from " + tableNameMM); + expectedValues.add(new String[] {"0"}); + } + + private void appendInsertIntoFromSelect(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + String tableName = testName.getMethodName() + "testInsertIntoFromSelect"; + String tableNameMM =tableName + "_MM"; + String tableNameSelect = testName.getMethodName() + "_Select"; + String tableNameSelectMM = testName.getMethodName() + "_SelectMM"; + + insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + insertRecords(tableName, tableNameSelect, false, OperationType.REPL_TEST_ACID_INSERT_SELECT); + selectStmtList.add("select key from " + tableName + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select key from " + tableNameSelect + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + + insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); + insertRecords(tableNameMM, tableNameSelectMM, true, OperationType.REPL_TEST_ACID_INSERT_SELECT); + selectStmtList.add("select key from " + tableNameMM + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select key from " + tableNameSelectMM + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + } + + private void appendMerge(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + String tableName = testName.getMethodName() + "testMerge"; + String tableNameMerge = testName.getMethodName() + "_Merge"; + + insertForMerge(tableName, tableNameMerge, false); + selectStmtList.add("select last_update_user from " + tableName + " order by last_update_user"); + expectedValues.add(new String[] {"creation", "creation", "creation", "creation", "creation", + "creation", "creation", "merge_update", "merge_insert", "merge_insert"}); + selectStmtList.add("select ID from " + tableNameMerge + " order by ID"); + expectedValues.add(new String[] {"1", "4", "7", "8", "8", "11"}); + } + + private void appendCreateAsSelect(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + String tableName = testName.getMethodName() + "testCreateAsSelect"; + String tableNameMM = tableName + "_MM"; + String tableNameCTAS = testName.getMethodName() + "_CTAS"; + String tableNameCTASMM = testName.getMethodName() + "_CTASMM"; + + insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + insertRecords(tableName, tableNameCTAS, false, OperationType.REPL_TEST_ACID_CTAS); + selectStmtList.add("select key from " + tableName + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select key from " + tableNameCTAS + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + + insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); + insertRecords(tableNameMM, tableNameCTASMM, true, OperationType.REPL_TEST_ACID_CTAS); + selectStmtList.add("select key from " + tableNameMM + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select key from " + tableNameCTASMM + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + } + + private void appendImport(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + String tableName = testName.getMethodName() + "testImport"; + String tableNameMM = tableName + "_MM"; + String tableNameImport = testName.getMethodName() + "_Import"; + String tableNameImportMM = testName.getMethodName() + "_ImportMM"; + + insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + insertRecords(tableName, tableNameImport, false, OperationType.REPL_TEST_ACID_INSERT_IMPORT); + selectStmtList.add("select key from " + tableName + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select key from " + tableNameImport + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + + insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); + insertRecords(tableNameMM, tableNameImportMM, true, OperationType.REPL_TEST_ACID_INSERT_IMPORT); + selectStmtList.add("select key from " + tableNameMM + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select key from " + tableNameImportMM + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + } + + private void appendInsertOverwrite(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + String tableName = testName.getMethodName() + "testInsertOverwrite"; + String tableNameOW = testName.getMethodName() +"_OW"; + String tableNameMM = tableName + "_MM"; + String tableNameOWMM = testName.getMethodName() +"_OWMM"; + + insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + insertRecords(tableName, tableNameOW, false, OperationType.REPL_TEST_ACID_INSERT_OVERWRITE); + selectStmtList.add("select key from " + tableName + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select key from " + tableNameOW + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + + insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); + insertRecords(tableNameMM, tableNameOWMM, true, OperationType.REPL_TEST_ACID_INSERT_OVERWRITE); + selectStmtList.add("select key from " + tableNameMM + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select key from " + tableNameOWMM + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + } + + //TODO: need to check why its failing. Loading to acid table from local path is failing. + private void appendLoadLocal(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + String tableName = testName.getMethodName() + "testLoadLocal"; + String tableNameLL = testName.getMethodName() +"_LL"; + String tableNameMM = tableName + "_MM"; + String tableNameLLMM = testName.getMethodName() +"_LLMM"; + + insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + insertRecords(tableName, tableNameLL, false, OperationType.REPL_TEST_ACID_INSERT_LOADLOCAL); + selectStmtList.add("select key from " + tableName + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select key from " + tableNameLL + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + + insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); + insertRecords(tableNameMM, tableNameLLMM, true, OperationType.REPL_TEST_ACID_INSERT_LOADLOCAL); + selectStmtList.add("select key from " + tableNameMM + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + selectStmtList.add("select key from " + tableNameLLMM + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + } + + private void appendInsertUnion(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + String tableName = testName.getMethodName() + "testInsertUnion"; + String tableNameUnion = testName.getMethodName() +"_UNION"; + String tableNameMM = tableName + "_MM"; + String tableNameUnionMM = testName.getMethodName() +"_UNIONMM"; + String[] resultArray = new String[]{"1", "2", "3", "4", "5"}; + String[] resultArrayUnion = new String[]{"1", "1", "2", "2", "3", "3", "4", "4", "5", "5"}; + + insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + insertRecords(tableName, tableNameUnion, false, OperationType.REPL_TEST_ACID_INSERT_UNION); + selectStmtList.add("select key from " + tableName + " order by key"); + expectedValues.add(resultArray); + selectStmtList.add( "select key from " + tableNameUnion + " order by key"); + expectedValues.add(resultArrayUnion); + selectStmtList.add("select key from " + tableName + "_nopart" + " order by key"); + expectedValues.add(resultArray); + selectStmtList.add("select key from " + tableNameUnion + "_nopart" + " order by key"); + expectedValues.add(resultArrayUnion); + + insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); + insertRecords(tableNameMM, tableNameUnionMM, true, OperationType.REPL_TEST_ACID_INSERT_UNION); + selectStmtList.add("select key from " + tableNameMM + " order by key"); + expectedValues.add(resultArray); + selectStmtList.add( "select key from " + tableNameUnionMM + " order by key"); + expectedValues.add(resultArrayUnion); + selectStmtList.add("select key from " + tableNameMM + "_nopart" + " order by key"); + expectedValues.add(resultArray); + selectStmtList.add("select key from " + tableNameUnionMM + "_nopart" + " order by key"); + expectedValues.add(resultArrayUnion); + } + + private void appendMultiStatementTxn(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + String tableName = testName.getMethodName() + "testMultiStatementTxn"; + String[] resultArray = new String[]{"1", "2", "3", "4", "5"}; + String tableNameMM = tableName + "_MM"; + String tableProperty = "'transactional'='true'"; + + insertIntoDB(primaryDbName, tableName, tableProperty, resultArray, true); + selectStmtList.add("select key from " + tableName + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + + tableProperty = setMMtableProperty(tableProperty); + insertIntoDB(primaryDbName, tableNameMM, tableProperty, resultArray, true); + selectStmtList.add("select key from " + tableNameMM + " order by key"); + expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); + } + + private void appendMultiStatementTxnUpdateDelete(List<String> selectStmtList, List<String[]> expectedValues) + throws Throwable { + String tableName = testName.getMethodName() + "testMultiStatementTxnUpdate"; + String tableNameDelete = testName.getMethodName() + "testMultiStatementTxnDelete"; + String[] resultArray = new String[]{"1", "2", "3", "4", "5"}; + String tableProperty = "'transactional'='true'"; + + insertIntoDB(primaryDbName, tableName, tableProperty, resultArray, true); + updateRecords(tableName); + selectStmtList.add("select value from " + tableName + " order by value"); + expectedValues.add(new String[] {"1", "100", "100", "100", "100"}); + + insertIntoDB(primaryDbName, tableNameDelete, tableProperty, resultArray, true); + deleteRecords(tableNameDelete); + selectStmtList.add("select count(*) from " + tableNameDelete); + expectedValues.add(new String[] {"0"}); + } + + @Test + public void testReplCM() throws Throwable { + String tableName = testName.getMethodName(); + String tableNameMM = testName.getMethodName() + "_MM"; + String[] result = new String[]{"5"}; + + WarehouseInstance.Tuple incrementalDump; + WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId); + + insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); + truncateTable(primaryDbName, tableName); + replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId); + verifyResultsInReplica(Lists.newArrayList("select count(*) from " + tableName, + "select count(*) from " + tableName + "_nopart"), + Lists.newArrayList(result, result)); + + insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); + incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); + truncateTable(primaryDbName, tableNameMM); + replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId); + verifyResultsInReplica(Lists.newArrayList("select count(*) from " + tableNameMM, + "select count(*) from " + tableNameMM + "_nopart"), + Lists.newArrayList(result, result)); + } + + @Test + public void testMultiDBTxn() throws Throwable { + String tableName = testName.getMethodName(); + String dbName1 = tableName + "_db1"; + String dbName2 = tableName + "_db2"; + String[] resultArray = new String[]{"1", "2", "3", "4", "5"}; + String tableProperty = "'transactional'='true'"; + String txnStrStart = "START TRANSACTION"; + String txnStrCommit = "COMMIT"; + + WarehouseInstance.Tuple incrementalDump; + primary.run("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')"); + WarehouseInstance.Tuple bootStrapDump = primary.dump("`*`", null); + + primary.run("use " + primaryDbName) + .run("create database " + dbName1 + " WITH DBPROPERTIES ( '" + SOURCE_OF_REPLICATION + "' = '1,2,3')") + .run("create database " + dbName2 + " WITH DBPROPERTIES ( '" + SOURCE_OF_REPLICATION + "' = '1,2,3')") + .run("CREATE TABLE " + dbName1 + "." + tableName + " (key int, value int) PARTITIONED BY (load_date date) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") + .run("use " + dbName1) + .run("SHOW TABLES LIKE '" + tableName + "'") + .verifyResult(tableName) + .run("CREATE TABLE " + dbName2 + "." + tableName + " (key int, value int) PARTITIONED BY (load_date date) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") + .run("use " + dbName2) + .run("SHOW TABLES LIKE '" + tableName + "'") + .verifyResult(tableName) + .run(txnStrStart) + .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-02') VALUES (5, 5)") + .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-01') VALUES (1, 1)") + .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-01') VALUES (2, 2)") + .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-01') VALUES (2, 2)") + .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-02') VALUES (3, 3)") + .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-02') VALUES (3, 3)") + .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-03') VALUES (4, 4)") + .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-02') VALUES (5, 5)") + .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-01') VALUES (1, 1)") + .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-03') VALUES (4, 4)") + .run("select key from " + dbName2 + "." + tableName + " order by key") + .verifyResults(resultArray) + .run("select key from " + dbName1 + "." + tableName + " order by key") + .verifyResults(resultArray) + .run(txnStrCommit); + + incrementalDump = primary.dump("`*`", bootStrapDump.lastReplicationId); + + // Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM + // we are not able to create multiple embedded derby instances for two different MetaStore instances. + primary.run("drop database " + primaryDbName + " cascade"); + primary.run("drop database " + dbName1 + " cascade"); + primary.run("drop database " + dbName2 + " cascade"); + //End of additional steps + + replica.loadWithoutExplain("", bootStrapDump.dumpLocation) + .run("REPL STATUS default") + .verifyResult(bootStrapDump.lastReplicationId); + + replica.loadWithoutExplain("", incrementalDump.dumpLocation) + .run("REPL STATUS " + dbName1) + .run("select key from " + dbName1 + "." + tableName + " order by key") + .verifyResults(resultArray) + .run("select key from " + dbName2 + "." + tableName + " order by key") + .verifyResults(resultArray); + + replica.run("drop database " + primaryDbName + " cascade"); + replica.run("drop database " + dbName1 + " cascade"); + replica.run("drop database " + dbName2 + " cascade"); + } + + private void verifyResultsInReplica(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + for (int idx = 0; idx < selectStmtList.size(); idx++) { + replica.run("use " + replicatedDbName) + .run(selectStmtList.get(idx)) + .verifyResults(expectedValues.get(idx)); + } + } + + private WarehouseInstance.Tuple verifyIncrementalLoad(List<String> selectStmtList, + List<String[]> expectedValues, String lastReplId) throws Throwable { + WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, lastReplId); + replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId); + verifyResultsInReplica(selectStmtList, expectedValues); + + replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId); + verifyResultsInReplica(selectStmtList, expectedValues); + return incrementalDump; + } + + private void deleteRecords(String tableName) throws Throwable { + primary.run("use " + primaryDbName) + .run("delete from " + tableName) + .run("select count(*) from " + tableName) + .verifyResult("0"); + } + + private void updateRecords(String tableName) throws Throwable { + primary.run("use " + primaryDbName) + .run("update " + tableName + " set value = 100 where key >= 2") + .run("select value from " + tableName + " order by value") + .verifyResults(new String[] {"1", "100", "100", "100", "100"}); + } + + private void truncateTable(String dbName, String tableName) throws Throwable { + primary.run("use " + dbName) + .run("truncate table " + tableName) + .run("select count(*) from " + tableName) + .verifyResult("0") + .run("truncate table " + tableName + "_nopart") + .run("select count(*) from " + tableName + "_nopart") + .verifyResult("0"); + } + + private WarehouseInstance.Tuple verifyLoad(String tableName, String tableNameOp, String lastReplId) throws Throwable { + String[] resultArray = new String[]{"1", "2", "3", "4", "5"}; + if (tableNameOp == null) { + return verifyIncrementalLoad(Lists.newArrayList("select key from " + tableName + " order by key", + "select key from " + tableName + "_nopart order by key"), + Lists.newArrayList(resultArray, resultArray), lastReplId); + } + return verifyIncrementalLoad(Lists.newArrayList("select key from " + tableName + " order by key", + "select key from " + tableNameOp + " order by key", + "select key from " + tableName + "_nopart" + " order by key", + "select key from " + tableNameOp + "_nopart" + " order by key"), + Lists.newArrayList(resultArray, resultArray, resultArray, resultArray), lastReplId); + } + + private void insertIntoDB(String dbName, String tableName, String tableProperty, String[] resultArray, boolean isTxn) + throws Throwable { + String txnStrStart = "START TRANSACTION"; + String txnStrCommit = "COMMIT"; + if (!isTxn) { + txnStrStart = "use " + dbName; //dummy + txnStrCommit = "use " + dbName; //dummy + } + primary.run("use " + dbName); + primary.run("CREATE TABLE " + tableName + " (key int, value int) PARTITIONED BY (load_date date) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") + .run("SHOW TABLES LIKE '" + tableName + "'") + .verifyResult(tableName) + .run("CREATE TABLE " + tableName + "_nopart (key int, value int) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") + .run("SHOW TABLES LIKE '" + tableName + "_nopart'") + .run("ALTER TABLE " + tableName + " ADD PARTITION (load_date='2016-03-03')") + .run(txnStrStart) + .run("INSERT INTO " + tableName + " partition (load_date='2016-03-01') VALUES (1, 1)") + .run("INSERT INTO " + tableName + " partition (load_date='2016-03-01') VALUES (2, 2)") + .run("INSERT INTO " + tableName + " partition (load_date='2016-03-02') VALUES (3, 3)") + .run("INSERT INTO " + tableName + " partition (load_date='2016-03-03') VALUES (4, 4)") + .run("INSERT INTO " + tableName + " partition (load_date='2016-03-02') VALUES (5, 5)") + .run("select key from " + tableName + " order by key") + .verifyResults(resultArray) + .run("INSERT INTO " + tableName + "_nopart (key, value) select key, value from " + tableName) + .run("select key from " + tableName + "_nopart" + " order by key") + .verifyResults(resultArray) + .run(txnStrCommit); + } + + private void insertIntoDB(String dbName, String tableName, String tableProperty, String[] resultArray) + throws Throwable { + insertIntoDB(dbName, tableName, tableProperty, resultArray, false); + } + + private void insertRecords(String tableName, String tableNameOp, boolean isMMTable, + OperationType opType) throws Throwable { + insertRecordsIntoDB(primaryDbName, tableName, tableNameOp, isMMTable, opType); + } + + private void insertRecordsIntoDB(String DbName, String tableName, String tableNameOp, boolean isMMTable, + OperationType opType) throws Throwable { + String[] resultArray = new String[]{"1", "2", "3", "4", "5"}; + String tableProperty = "'transactional'='true'"; + if (isMMTable) { + tableProperty = setMMtableProperty(tableProperty); + } + primary.run("use " + DbName); + + switch (opType) { + case REPL_TEST_ACID_INSERT: + insertIntoDB(DbName, tableName, tableProperty, resultArray); + insertIntoDB(primaryDbNameExtra, tableName, tableProperty, resultArray); + return; + case REPL_TEST_ACID_INSERT_OVERWRITE: + primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( "+ tableProperty + " )") + .run("INSERT INTO " + tableNameOp + " partition (load_date='2016-03-01') VALUES (2, 2)") + .run("INSERT INTO " + tableNameOp + " partition (load_date='2016-03-01') VALUES (10, 12)") + .run("INSERT INTO " + tableNameOp + " partition (load_date='2016-03-02') VALUES (11, 1)") + .run("select key from " + tableNameOp + " order by key") + .verifyResults(new String[]{"2", "10", "11"}) + .run("insert overwrite table " + tableNameOp + " select * from " + tableName) + .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( "+ tableProperty + " )") + .run("INSERT INTO " + tableNameOp + "_nopart VALUES (2, 2)") + .run("INSERT INTO " + tableNameOp + "_nopart VALUES (10, 12)") + .run("INSERT INTO " + tableNameOp + "_nopart VALUES (11, 1)") + .run("select key from " + tableNameOp + "_nopart" + " order by key") + .verifyResults(new String[]{"2", "10", "11"}) + .run("insert overwrite table " + tableNameOp + "_nopart select * from " + tableName + "_nopart") + .run("select key from " + tableNameOp + "_nopart" + " order by key"); + break; + case REPL_TEST_ACID_INSERT_SELECT: + primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + " )") + .run("insert into " + tableNameOp + " partition (load_date) select * from " + tableName) + .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + " )") + .run("insert into " + tableNameOp + "_nopart select * from " + tableName + "_nopart"); + break; + case REPL_TEST_ACID_INSERT_IMPORT: + String path = "hdfs:///tmp/" + DbName + "/"; + String exportPath = "'" + path + tableName + "/'"; + String exportPathNoPart = "'" + path + tableName + "_nopart/'"; + primary.run("export table " + tableName + " to " + exportPath) + .run("import table " + tableNameOp + " from " + exportPath) + .run("export table " + tableName + "_nopart to " + exportPathNoPart) + .run("import table " + tableNameOp + "_nopart from " + exportPathNoPart); + break; + case REPL_TEST_ACID_CTAS: + primary.run("create table " + tableNameOp + " as select * from " + tableName) + .run("create table " + tableNameOp + "_nopart as select * from " + tableName + "_nopart"); + break; + case REPL_TEST_ACID_INSERT_LOADLOCAL: + primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") + .run("SHOW TABLES LIKE '" + tableNameOp + "'") + .verifyResult(tableNameOp) + .run("INSERT OVERWRITE LOCAL DIRECTORY './test.dat' SELECT a.* FROM " + tableName + " a") + .run("LOAD DATA LOCAL INPATH './test.dat' OVERWRITE INTO TABLE " + tableNameOp + + " PARTITION (load_date='2008-08-15')") + .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") + .run("SHOW TABLES LIKE '" + tableNameOp + "_nopart'") + .verifyResult(tableNameOp + "_nopart") + .run("LOAD DATA LOCAL INPATH './test.dat' OVERWRITE INTO TABLE " + tableNameOp + "_nopart"); + break; + case REPL_TEST_ACID_INSERT_UNION: + primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") + .run("SHOW TABLES LIKE '" + tableNameOp + "'") + .verifyResult(tableNameOp) + .run("insert overwrite table " + tableNameOp + " partition (load_date) select * from " + tableName + + " union all select * from " + tableName) + .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") + .run("insert overwrite table " + tableNameOp + "_nopart select * from " + tableName + + "_nopart union all select * from " + tableName + "_nopart"); + resultArray = new String[]{"1", "2", "3", "4", "5", "1", "2", "3", "4", "5"}; + break; + default: + return; + } + primary.run("select key from " + tableNameOp + " order by key").verifyResults(resultArray); + primary.run("select key from " + tableNameOp + "_nopart" + " order by key").verifyResults(resultArray); + } + + private String setMMtableProperty(String tableProperty) throws Throwable { + return tableProperty.concat(", 'transactional_properties' = 'insert_only'"); + } + + private void insertForMerge(String tableName, String tableNameMerge, boolean isMMTable) throws Throwable { + String tableProperty = "'transactional'='true'"; + if (isMMTable) { + tableProperty = setMMtableProperty(tableProperty); + } + primary.run("use " + primaryDbName) + .run("CREATE TABLE " + tableName + "( ID int, TranValue string, last_update_user string) PARTITIONED BY " + + "(tran_date string) CLUSTERED BY (ID) into 5 buckets STORED AS ORC TBLPROPERTIES " + + " ( "+ tableProperty + " )") + .run("SHOW TABLES LIKE '" + tableName + "'") + .verifyResult(tableName) + .run("CREATE TABLE " + tableNameMerge + " ( ID int, TranValue string, tran_date string) STORED AS ORC ") + .run("SHOW TABLES LIKE '" + tableNameMerge + "'") + .verifyResult(tableNameMerge) + .run("INSERT INTO " + tableName + " PARTITION (tran_date) VALUES (1, 'value_01', 'creation', '20170410')," + + " (2, 'value_02', 'creation', '20170410'), (3, 'value_03', 'creation', '20170410'), " + + " (4, 'value_04', 'creation', '20170410'), (5, 'value_05', 'creation', '20170413'), " + + " (6, 'value_06', 'creation', '20170413'), (7, 'value_07', 'creation', '20170413'), " + + " (8, 'value_08', 'creation', '20170413'), (9, 'value_09', 'creation', '20170413'), " + + " (10, 'value_10','creation', '20170413')") + .run("select ID from " + tableName + " order by ID") + .verifyResults(new String[] {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}) + .run("INSERT INTO " + tableNameMerge + " VALUES (1, 'value_01', '20170410'), " + + " (4, NULL, '20170410'), (7, 'value_77777', '20170413'), " + + " (8, NULL, '20170413'), (8, 'value_08', '20170415'), " + + "(11, 'value_11', '20170415')") + .run("select ID from " + tableNameMerge + " order by ID") + .verifyResults(new String[] {"1", "4", "7", "8", "8", "11"}) + .run("MERGE INTO " + tableName + " AS T USING " + tableNameMerge + " AS S ON T.ID = S.ID and" + + " T.tran_date = S.tran_date WHEN MATCHED AND (T.TranValue != S.TranValue AND S.TranValue " + + " IS NOT NULL) THEN UPDATE SET TranValue = S.TranValue, last_update_user = " + + " 'merge_update' WHEN MATCHED AND S.TranValue IS NULL THEN DELETE WHEN NOT MATCHED " + + " THEN INSERT VALUES (S.ID, S.TranValue,'merge_insert', S.tran_date)") + .run("select last_update_user from " + tableName + " order by last_update_user") + .verifyResults(new String[] {"creation", "creation", "creation", "creation", "creation", + "creation", "creation", "merge_update", "merge_insert", "merge_insert"}); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index ff7f9bc..16c124c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -321,8 +321,7 @@ public class TestReplicationScenariosAcrossInstances { "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") .run("create table table1 (i int, j int)") .run("insert into table1 values (1,2)") - .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'", - "'hive.repl.dump.include.acid.tables'='true'")); + .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'")); replica.load(replicatedDbName, tuple.dumpLocation) .run("use " + replicatedDbName) @@ -341,8 +340,7 @@ public class TestReplicationScenariosAcrossInstances { .run("create table table2 (a int, city string) partitioned by (country string)") .run("create table table3 (i int, j int)") .run("insert into table1 values (1,2)") - .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'", - "'hive.repl.dump.include.acid.tables'='true'")); + .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'")); replica.load(replicatedDbName, bootstrapTuple.dumpLocation) .run("use " + replicatedDbName) @@ -467,8 +465,7 @@ public class TestReplicationScenariosAcrossInstances { SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("use " + dbTwo) .run("create table t1 (i int, j int)") - .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'", - "'hive.repl.dump.include.acid.tables'='true'")); + .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'")); /* Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM @@ -527,8 +524,7 @@ public class TestReplicationScenariosAcrossInstances { .run("use " + dbOne) .run("create table t1 (i int, j int) partitioned by (load_date date) " + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ") - .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'", - "'hive.repl.dump.include.acid.tables'='true'")); + .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'")); String dbTwo = primaryDbName + randomTwo; WarehouseInstance.Tuple incrementalTuple = primary @@ -539,8 +535,7 @@ public class TestReplicationScenariosAcrossInstances { .run("use " + dbOne) .run("create table t2 (a int, b int)") .dump("`*`", bootstrapTuple.lastReplicationId, - Arrays.asList("'hive.repl.dump.metadata.only'='true'", - "'hive.repl.dump.include.acid.tables'='true'")); + Arrays.asList("'hive.repl.dump.metadata.only'='true'")); /* Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index f666df1..1e3478d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -249,6 +249,11 @@ public class WarehouseInstance implements Closeable { return this; } + WarehouseInstance loadWithoutExplain(String replicatedDbName, String dumpLocation) throws Throwable { + run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + return this; + } + WarehouseInstance load(String replicatedDbName, String dumpLocation, List<String> withClauseOptions) throws Throwable { String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"; http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java index f87a6aa..2ba6d07 100644 --- a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java +++ b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest; import org.apache.thrift.TException; @@ -109,6 +110,10 @@ public final class SynchronizedMetaStoreClient { return client.fireListenerEvent(rqst); } + public synchronized void addWriteNotificationLog(WriteNotificationLogRequest rqst) throws TException { + client.addWriteNotificationLog(rqst); + } + public synchronized void close() { client.close(); } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 19097f5..bf7749d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -139,7 +139,11 @@ public class MoveTask extends Task<MoveWork> implements Serializable { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) { deletePath = createTargetPath(targetPath, tgtFs); } - Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false); + //For acid table incremental replication, just copy the content of staging directory to destination. + //No need to clean it. + if (work.isNeedCleanTarget()) { + Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false); + } // Set isManaged to false as this is not load data operation for which it is needed. if (!Hive.moveFile(conf, sourcePath, targetPath, true, false, false)) { try { http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 3a7f1bc..d095de6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -151,10 +151,11 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { continue; } String destFileName = srcFile.getCmPath().getName(); - Path destFile = new Path(toPath, destFileName); + Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath); + Path destFile = new Path(destRoot, destFileName); if (dstFs.exists(destFile)) { String destFileWithSourceName = srcFile.getSourcePath().getName(); - Path newDestFile = new Path(toPath, destFileWithSourceName); + Path newDestFile = new Path(destRoot, destFileWithSourceName); boolean result = dstFs.rename(destFile, newDestFile); if (!result) { throw new IllegalStateException( http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java index 5bbc25a..c2953c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; @@ -60,8 +62,19 @@ public class ReplTxnTask extends Task<ReplTxnWork> { return 0; } } catch (InvalidTableException e) { - LOG.info("Table does not exist so, ignoring the operation as it might be a retry(idempotent) case."); - return 0; + // In scenarios like import to mm tables, the alloc write id event is generated before create table event. + try { + Database database = Hive.get().getDatabase(work.getDbName()); + if (!replicationSpec.allowReplacementInto(database.getParameters())) { + // if the event is already replayed, then no need to replay it again. + LOG.debug("ReplTxnTask: Event is skipped as it is already replayed. Event Id: " + + replicationSpec.getReplicationState() + "Event Type: " + work.getOperationType()); + return 0; + } + } catch (HiveException e1) { + LOG.error("Get database failed with exception " + e1.getMessage()); + return 1; + } } catch (HiveException e) { LOG.error("Get table failed with exception " + e.getMessage()); return 1; @@ -85,10 +98,16 @@ public class ReplTxnTask extends Task<ReplTxnWork> { } return 0; case REPL_COMMIT_TXN: - for (long txnId : work.getTxnIds()) { - txnManager.replCommitTxn(replPolicy, txnId); - LOG.info("Replayed CommitTxn Event for policy " + replPolicy + " with srcTxn " + txnId); - } + // Currently only one commit txn per event is supported. + assert (work.getTxnIds().size() == 1); + + long txnId = work.getTxnIds().get(0); + CommitTxnRequest commitTxnRequest = new CommitTxnRequest(txnId); + commitTxnRequest.setReplPolicy(work.getReplPolicy()); + commitTxnRequest.setWriteEventInfos(work.getWriteEventInfos()); + txnManager.replCommitTxn(commitTxnRequest); + LOG.info("Replayed CommitTxn Event for replPolicy: " + replPolicy + " with srcTxn: " + txnId + + "WriteEventInfos: " + work.getWriteEventInfos()); return 0; case REPL_ALLOC_WRITE_ID: assert work.getTxnToWriteIdList() != null; http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index e48657c..82ecad1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -199,7 +199,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { cmRoot, getHive(), conf, - getNewEventOnlyReplicationSpec(ev.getEventId()) + getNewEventOnlyReplicationSpec(ev.getEventId()), + work.dbNameOrPattern, + work.tableNameOrPattern ); EventHandler eventHandler = EventHandlerFactory.handlerFor(ev); eventHandler.handle(context);