http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 9e0ce82..5e113c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -179,7 +179,7 @@ public class IncrementalLoadTasksBuilder { Database database; try { database = Hive.get().getDatabase(dbName); - return isEventNotReplayed(database.getParameters(), dir, dumpType); + return database == null ? true : isEventNotReplayed(database.getParameters(), dir, dumpType); } catch (HiveException e) { //may be the db is getting created in this load log.debug("failed to get the database " + dbName); @@ -255,50 +255,55 @@ public class IncrementalLoadTasksBuilder { return updateReplIdTask; } - private List<Task<? extends Serializable>> addUpdateReplStateTasks(boolean isDatabaseLoad, - UpdatedMetaDataTracker updatedMetadata, - List<Task<? extends Serializable>> importTasks) throws SemanticException { - String replState = updatedMetadata.getReplicationState(); - String database = updatedMetadata.getDatabase(); - String table = updatedMetadata.getTable(); - - // If no import tasks generated by the event or no table updated for table level load, then no - // need to update the repl state to any object. - if (importTasks.isEmpty() || (!isDatabaseLoad && (table == null))) { - log.debug("No objects need update of repl state: Either 0 import tasks or table level load"); + private List<Task<? extends Serializable>> addUpdateReplStateTasks( + boolean isDatabaseLoad, + UpdatedMetaDataTracker updatedMetaDataTracker, + List<Task<? extends Serializable>> importTasks) throws SemanticException { + // If no import tasks generated by the event then no need to update the repl state to any object. + if (importTasks.isEmpty()) { + log.debug("No objects need update of repl state: 0 import tasks"); return importTasks; } // Create a barrier task for dependency collection of import tasks - Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork()); - - // Link import tasks to the barrier task which will in-turn linked with repl state update tasks - for (Task<? extends Serializable> t : importTasks){ - t.addDependentTask(barrierTask); - log.debug("Added {}:{} as a precursor of barrier task {}:{}", - t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); - } - + Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); List<Task<? extends Serializable>> tasks = new ArrayList<>(); Task<? extends Serializable> updateReplIdTask; - // If any partition is updated, then update repl state in partition object - for (final Map<String, String> partSpec : updatedMetadata.getPartitions()) { - updateReplIdTask = tableUpdateReplStateTask(database, table, partSpec, replState, barrierTask); - tasks.add(updateReplIdTask); + for (UpdatedMetaDataTracker.UpdateMetaData updateMetaData : updatedMetaDataTracker.getUpdateMetaDataList()) { + String replState = updateMetaData.getReplState(); + String dbName = updateMetaData.getDbName(); + String tableName = updateMetaData.getTableName(); + // If any partition is updated, then update repl state in partition object + for (final Map<String, String> partSpec : updateMetaData.getPartitionsList()) { + updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + if (tableName != null) { + // If any table/partition is updated, then update repl state in table object + updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask); + tasks.add(updateReplIdTask); + } + + // For table level load, need not update replication state for the database + if (isDatabaseLoad) { + // If any table/partition is updated, then update repl state in db object + updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask); + tasks.add(updateReplIdTask); + } } - if (table != null) { - // If any table/partition is updated, then update repl state in table object - updateReplIdTask = tableUpdateReplStateTask(database, table, null, replState, barrierTask); - tasks.add(updateReplIdTask); + if (tasks.isEmpty()) { + log.debug("No objects need update of repl state: 0 update tracker tasks"); + return importTasks; } - // For table level load, need not update replication state for the database - if (isDatabaseLoad) { - // If any table/partition is updated, then update repl state in db object - updateReplIdTask = dbUpdateReplStateTask(database, replState, barrierTask); - tasks.add(updateReplIdTask); + // Link import tasks to the barrier task which will in-turn linked with repl state update tasks + for (Task<? extends Serializable> t : importTasks){ + t.addDependentTask(barrierTask); + log.debug("Added {}:{} as a precursor of barrier task {}:{}", + t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); } // At least one task would have been added to update the repl state
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 7fce67f..16ba82e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -22,7 +22,6 @@ import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; import java.io.IOException; import java.io.Serializable; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -71,21 +70,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; - -import java.io.IOException; -import java.io.Serializable; import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.regex.Pattern; - -import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; - /** * Utilities that are shared by all of the ACID input and output formats. They @@ -1907,6 +1892,28 @@ public class AcidUtils { return null; } + //Get the first level acid directory (if any) from a given path + public static String getFirstLevelAcidDirPath(Path dataPath, FileSystem fileSystem) throws IOException { + if (dataPath == null) { + return null; + } + String firstLevelAcidDir = getAcidSubDir(dataPath); + if (firstLevelAcidDir != null) { + return firstLevelAcidDir; + } + + String acidDirPath = getFirstLevelAcidDirPath(dataPath.getParent(), fileSystem); + if (acidDirPath == null) { + return null; + } + + // We need the path for directory so no need to append file name + if (fileSystem.isDirectory(dataPath)) { + return acidDirPath + Path.SEPARATOR + dataPath.getName(); + } + return acidDirPath; + } + public static boolean isAcidEnabled(HiveConf hiveConf) { String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER); boolean concurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index bcc0508..ec8527e 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -26,30 +26,32 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.StringInternUtils; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.Ref; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.StringInternUtils; -import org.apache.hadoop.hive.common.ValidTxnWriteIdList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.llap.io.api.LlapIo; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner; @@ -62,8 +64,6 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; -import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; -import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.Deserializer; @@ -78,10 +78,7 @@ import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.StringUtils; -import org.apache.hive.common.util.Ref; import org.apache.hive.common.util.ReflectionUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * HiveInputFormat is a parameterized InputFormat which looks at the path name @@ -460,8 +457,9 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits, TableDesc table, List<InputSplit> result) throws IOException { + String tableName = table.getTableName(); ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList( - conf, table.getTableName()); + conf, tableName == null ? null : HiveStringUtils.normalizeIdentifier(tableName)); ValidWriteIdList validMmWriteIdList = getMmValidWriteIds(conf, table, validWriteIdList); try { http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 4fd1d4e..78980fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchLockException; import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.Context; @@ -638,14 +639,15 @@ public final class DbTxnManager extends HiveTxnManagerImpl { } @Override - public void replCommitTxn(String replPolicy, long srcTxnId) throws LockException { + public void replCommitTxn(CommitTxnRequest rqst) throws LockException { try { - getMS().replCommitTxn(srcTxnId, replPolicy); + getMS().replCommitTxn(rqst); } catch (NoSuchTxnException e) { - LOG.error("Metastore could not find " + JavaUtils.txnIdToString(srcTxnId)); - throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(srcTxnId)); + LOG.error("Metastore could not find " + JavaUtils.txnIdToString(rqst.getTxnid())); + throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(rqst.getTxnid())); } catch (TxnAbortedException e) { - LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(srcTxnId), e.getMessage()); + LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, + JavaUtils.txnIdToString(rqst.getTxnid()), e.getMessage()); LOG.error(le.getMessage()); throw le; } catch (TException e) { @@ -1013,7 +1015,11 @@ public final class DbTxnManager extends HiveTxnManagerImpl { assert isTxnOpen(); return stmtId++; } - + @Override + public int getCurrentStmtId() { + assert isTxnOpen(); + return stmtId; + } @Override public long getTableWriteId(String dbName, String tableName) throws LockException { assert isTxnOpen(); http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index ab9d67e..1feddeb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +75,10 @@ class DummyTxnManager extends HiveTxnManagerImpl { return 0; } @Override + public int getCurrentStmtId() { + return 0; + } + @Override public long getTableWriteId(String dbName, String tableName) throws LockException { return 0L; } @@ -220,7 +225,7 @@ class DummyTxnManager extends HiveTxnManagerImpl { } @Override - public void replCommitTxn(String replPolicy, long srcTxnId) throws LockException { + public void replCommitTxn(CommitTxnRequest rqst) throws LockException { // No-op } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 5f68e08..9575552 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.ql.Context; @@ -61,11 +62,11 @@ public interface HiveTxnManager { /** * Commit the transaction in target cluster. - * @param replPolicy Replication policy to uniquely identify the source cluster. - * @param srcTxnId The id of the transaction at the source cluster + * + * @param rqst Commit transaction request having information related to commit txn and write events. * @throws LockException in case of failure to commit the transaction. */ - void replCommitTxn(String replPolicy, long srcTxnId) throws LockException; + void replCommitTxn(CommitTxnRequest rqst) throws LockException; /** * Abort the transaction in target cluster. @@ -295,6 +296,9 @@ public interface HiveTxnManager { */ int getStmtIdAndIncrement(); + // Can be used by operation to set the stmt id when allocation is done somewhere else. + int getCurrentStmtId(); + /** * Acquire the materialization rebuild lock for a given view. We need to specify the fully * qualified name of the materialized view and the open transaction ID so we can identify http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index c3809d8..c2ffe02 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -159,6 +159,7 @@ import org.apache.hadoop.hive.metastore.api.WMResourcePlan; import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; @@ -1719,6 +1720,13 @@ public class Hive { List<Path> newFiles = Collections.synchronizedList(new ArrayList<Path>()); perfLogger.PerfLogBegin("MoveTask", PerfLogger.FILE_MOVES); + + // If config is set, table is not temporary and partition being inserted exists, capture + // the list of files added. For not yet existing partitions (insert overwrite to new partition + // or dynamic partition inserts), the add partition event will capture the list of files added. + if (areEventsForDmlNeeded(tbl, oldPart)) { + newFiles = Collections.synchronizedList(new ArrayList<Path>()); + } // Note: the stats for ACID tables do not have any coordination with either Hive ACID logic // like txn commits, time outs, etc.; nor the lower level sync in metastore pertaining @@ -1731,8 +1739,8 @@ public class Hive { Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)"); } assert !isAcidIUDoperation; - if (areEventsForDmlNeeded(tbl, oldPart)) { - newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId); + if (newFiles != null) { + listFilesCreatedByQuery(loadPath, writeId, stmtId, isMmTableWrite ? isInsertOverwrite : false, newFiles); } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("maybe deleting stuff from " + oldPartPath @@ -1781,8 +1789,15 @@ public class Hive { // or dynamic partition inserts), the add partition event will capture the list of files added. // Generate an insert event only if inserting into an existing partition // When inserting into a new partition, the add partition event takes care of insert event - if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) { - fireInsertEvent(tbl, partSpec, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); + if ((null != oldPart) && (null != newFiles)) { + if (isTxnTable) { + addWriteNotificationLog(tbl, partSpec, newFiles, writeId); + } else { + fireInsertEvent(tbl, partSpec, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); + } + } else { + LOG.debug("No new files were created, and is not a replace, or we're inserting into a " + + "partition that does not exist yet. Skipping generating INSERT event."); } // column stats will be inaccurate @@ -1852,6 +1867,12 @@ public class Hive { } throw e; } + + // For acid table, add the acid_write event with file list at the time of load itself. But + // it should be done after partition is created. + if (isTxnTable && (null != newFiles)) { + addWriteNotificationLog(tbl, partSpec, newFiles, writeId); + } } else { setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart); } @@ -1906,50 +1927,47 @@ public class Hive { } private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) { - return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null; + // For Acid IUD, add partition is a meta data only operation. So need to add the new files added + // information into the TXN_WRITE_NOTIFICATION_LOG table. + return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && + ((null != oldPart) || AcidUtils.isTransactionalTable(tbl)); + } + + private void listFilesInsideAcidDirectory(Path acidDir, FileSystem srcFs, List<Path> newFiles) throws IOException { + // list out all the files/directory in the path + FileStatus[] acidFiles; + acidFiles = srcFs.listStatus(acidDir); + if (acidFiles == null) { + LOG.debug("No files added by this query in: " + acidDir); + return; + } + for (FileStatus acidFile : acidFiles) { + // need to list out only files, ignore folders. + if (!acidFile.isDirectory()) { + newFiles.add(acidFile.getPath()); + } else { + listFilesInsideAcidDirectory(acidFile.getPath(), srcFs, newFiles); + } + } } - private List<Path> listFilesCreatedByQuery(Path loadPath, long writeId, int stmtId) throws HiveException { - List<Path> newFiles = new ArrayList<Path>(); - final String filePrefix = AcidUtils.deltaSubdir(writeId, writeId, stmtId); - FileStatus[] srcs; - FileSystem srcFs; + private void listFilesCreatedByQuery(Path loadPath, long writeId, int stmtId, + boolean isInsertOverwrite, List<Path> newFiles) throws HiveException { + Path acidDir = new Path(loadPath, AcidUtils.baseOrDeltaSubdir(isInsertOverwrite, writeId, writeId, stmtId)); try { - srcFs = loadPath.getFileSystem(conf); - srcs = srcFs.listStatus(loadPath); + FileSystem srcFs = loadPath.getFileSystem(conf); + if (srcFs.exists(acidDir) && srcFs.isDirectory(acidDir)){ + // list out all the files in the path + listFilesInsideAcidDirectory(acidDir, srcFs, newFiles); + } else { + LOG.info("directory does not exist: " + acidDir); + return; + } } catch (IOException e) { LOG.error("Error listing files", e); throw new HiveException(e); } - if (srcs == null) { - LOG.info("No sources specified: " + loadPath); - return newFiles; - } - PathFilter subdirFilter = null; - - // Note: just like the move path, we only do one level of recursion. - for (FileStatus src : srcs) { - if (src.isDirectory()) { - if (subdirFilter == null) { - subdirFilter = new PathFilter() { - @Override - public boolean accept(Path path) { - return path.getName().startsWith(filePrefix); - } - }; - } - try { - for (FileStatus srcFile : srcFs.listStatus(src.getPath(), subdirFilter)) { - newFiles.add(srcFile.getPath()); - } - } catch (IOException e) { - throw new HiveException(e); - } - } else if (src.getPath().getName().startsWith(filePrefix)) { - newFiles.add(src.getPath()); - } - } - return newFiles; + return; } private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, Table tbl, @@ -2301,13 +2319,17 @@ private void constructOneLBLocationMap(FileStatus fSta, PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_TABLE); - List<Path> newFiles = Collections.synchronizedList(new ArrayList<Path>()); + List<Path> newFiles = null; Table tbl = getTable(tableName); assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName(); boolean isTxnTable = AcidUtils.isTransactionalTable(tbl); boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl); boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); + if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { + newFiles = Collections.synchronizedList(new ArrayList<Path>()); + } + // Note: this assumes both paths are qualified; which they are, currently. if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) { /** @@ -2319,7 +2341,11 @@ private void constructOneLBLocationMap(FileStatus fSta, Utilities.FILE_OP_LOGGER.debug( "not moving " + loadPath + " to " + tbl.getPath() + " (MM)"); } - newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId); + + //new files list is required only for event notification. + if (newFiles != null) { + listFilesCreatedByQuery(loadPath, writeId, stmtId, isMmTable ? isInsertOverwrite : false, newFiles); + } } else { // Either a non-MM query, or a load into MM table from an external source. Path tblPath = tbl.getPath(); @@ -2390,10 +2416,10 @@ private void constructOneLBLocationMap(FileStatus fSta, alterTable(tbl, environmentContext); - if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { - fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); + if (AcidUtils.isTransactionalTable(tbl)) { + addWriteNotificationLog(tbl, null, newFiles, writeId); } else { - fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), null); + fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles); } perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_TABLE); @@ -2647,6 +2673,48 @@ private void constructOneLBLocationMap(FileStatus fSta, tpart.getSd().setLocation(partPath); } + private void addWriteNotificationLog(Table tbl, Map<String, String> partitionSpec, + List<Path> newFiles, Long writeId) throws HiveException { + if (!conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) { + LOG.debug("write notification log is ignored as dml event logging is disabled"); + return; + } + + if (tbl.isTemporary()) { + LOG.debug("write notification log is ignored as " + tbl.getTableName() + " is temporary : " + writeId); + return; + } + + if (newFiles == null || newFiles.isEmpty()) { + LOG.debug("write notification log is ignored as file list is empty"); + return; + } + + LOG.debug("adding write notification log for operation " + writeId + " table " + tbl.getCompleteName() + + "partition " + partitionSpec + " list of files " + newFiles); + + try { + FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf); + Long txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); + + InsertEventRequestData insertData = new InsertEventRequestData(); + insertData.setReplace(true); + + WriteNotificationLogRequest rqst = new WriteNotificationLogRequest(txnId, writeId, + tbl.getDbName(), tbl.getTableName(), insertData); + addInsertFileInformation(newFiles, fileSystem, insertData); + + if (partitionSpec != null && !partitionSpec.isEmpty()) { + for (FieldSchema fs : tbl.getPartitionKeys()) { + rqst.addToPartitionVals(partitionSpec.get(fs.getName())); + } + } + getSynchronizedMSC().addWriteNotificationLog(rqst); + } catch (IOException | TException e) { + throw new HiveException(e); + } + } + private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, boolean replace, List<Path> newFiles) throws HiveException { if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) { @@ -2723,6 +2791,7 @@ private void constructOneLBLocationMap(FileStatus fSta, InsertEventRequestData insertData) throws IOException { insertData.addToFilesAdded(p.toString()); FileChecksum cksum = fileSystem.getFileChecksum(p); + String acidDirPath = AcidUtils.getFirstLevelAcidDirPath(p.getParent(), fileSystem); // File checksum is not implemented for local filesystem (RawLocalFileSystem) if (cksum != null) { String checksumString = @@ -2732,6 +2801,11 @@ private void constructOneLBLocationMap(FileStatus fSta, // Add an empty checksum string for filesystems that don't generate one insertData.addToFilesAddedChecksum(""); } + + // acid dir will be present only for acid write operations. + if (acidDirPath != null) { + insertData.addToSubDirectoryList(acidDirPath); + } } public boolean dropPartition(String tblName, List<String> part_vals, boolean deleteData) @@ -3690,7 +3764,6 @@ private void constructOneLBLocationMap(FileStatus fSta, @Override public Void call() throws HiveException { SessionState.setCurrentSessionState(parentSession); - final String group = srcStatus.getGroup(); try { boolean success = false; if (destFs instanceof DistributedFileSystem) { http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java index f1c4d98..e04a0f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.metadata; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -431,11 +432,19 @@ public final class HiveUtils { public static String getReplPolicy(String dbName, String tableName) { if ((dbName == null) || (dbName.isEmpty())) { - return null; + return "*.*"; } else if ((tableName == null) || (tableName.isEmpty())) { return dbName.toLowerCase() + ".*"; } else { return dbName.toLowerCase() + "." + tableName.toLowerCase(); } } + + public static Path getDumpPath(Path root, String dbName, String tableName) { + assert (dbName != null); + if ((tableName != null) && (!tableName.isEmpty())) { + return new Path(root, dbName + "." + tableName); + } + return new Path(root, dbName); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index d34de61..eb594f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -249,9 +250,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { throw new HiveException(e); } + boolean inReplicationScope = false; if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ tblDesc.setReplicationSpec(replicationSpec); StatsSetupConst.setBasicStatsState(tblDesc.getTblProps(), StatsSetupConst.FALSE); + inReplicationScope = true; } if (isExternalSet) { @@ -275,7 +278,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { for (Partition partition : partitions) { // TODO: this should ideally not create AddPartitionDesc per partition AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition); - if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ + if (inReplicationScope){ StatsSetupConst.setBasicStatsState(partsDesc.getPartition(0).getPartParams(), StatsSetupConst.FALSE); } partitionDescs.add(partsDesc); @@ -335,13 +338,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { //if importing into existing transactional table or will create a new transactional table //(because Export was done from transactional table), need a writeId // Explain plan doesn't open a txn and hence no need to allocate write id. - if (x.getCtx().getExplainConfig() == null) { + // In replication flow, no need to allocate write id. It will be allocated using the alloc write id event. + if (x.getCtx().getExplainConfig() == null && !inReplicationScope) { writeId = txnMgr.getTableWriteId(tblDesc.getDatabaseName(), tblDesc.getTableName()); stmtId = txnMgr.getStmtIdAndIncrement(); } } - if (!replicationSpec.isInReplicationScope()) { + if (!inReplicationScope) { createRegularImportTasks( tblDesc, partitionDescs, isPartSpecSet, replicationSpec, table, @@ -390,7 +394,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); Path destPath = null, loadPath = null; LoadFileType lft; - if (AcidUtils.isTransactionalTable(table)) { + if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) { String mmSubdir = replace ? AcidUtils.baseDir(writeId) : AcidUtils.deltaSubdir(writeId, writeId, stmtId); destPath = new Path(tgtPath, mmSubdir); @@ -428,13 +432,26 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } - LoadTableDesc loadTableWork = new LoadTableDesc( - loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId); - loadTableWork.setStmtId(stmtId); + MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(), null, null, false); + + + if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table)) { + LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( + Collections.singletonList(destPath), + Collections.singletonList(tgtPath), + true, null, null); + moveWork.setMultiFilesDesc(loadFilesWork); + moveWork.setNeedCleanTarget(false); + } else { + LoadTableDesc loadTableWork = new LoadTableDesc( + loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId); + loadTableWork.setStmtId(stmtId); + moveWork.setLoadTableWork(loadTableWork); + } + //if Importing into existing table, FileFormat is checked by // ImportSemanticAnalzyer.checked checkTable() - MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false); - Task<?> loadTableTask = TaskFactory.get(mv, x.getConf()); + Task<?> loadTableTask = TaskFactory.get(moveWork, x.getConf()); copyTask.addDependentTask(loadTableTask); x.getTasks().add(copyTask); return loadTableTask; @@ -498,8 +515,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { + partSpecToString(partSpec.getPartSpec()) + " with source location: " + srcLocation); Path tgtLocation = new Path(partSpec.getLocation()); - Path destPath = !AcidUtils.isTransactionalTable(table.getParameters()) ? - x.getCtx().getExternalTmpPath(tgtLocation) + //Replication scope the write id will be invalid + Boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) || + replicationSpec.isInReplicationScope(); + Path destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation) : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); Path moveTaskSrc = !AcidUtils.isTransactionalTable(table.getParameters()) ? destPath : tgtLocation; if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { @@ -523,17 +542,29 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Task<?> addPartTask = TaskFactory.get( new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf()); + MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(), + null, null, false); + // Note: this sets LoadFileType incorrectly for ACID; is that relevant for import? // See setLoadFileType and setIsAcidIow calls elsewhere for an example. - LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table), - partSpec.getPartSpec(), - replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, - writeId); - loadTableWork.setStmtId(stmtId); - loadTableWork.setInheritTableSpecs(false); - Task<?> loadPartTask = TaskFactory.get( - new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false), - x.getConf()); + if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(tblDesc.getTblProps())) { + LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( + Collections.singletonList(destPath), + Collections.singletonList(tgtLocation), + true, null, null); + moveWork.setMultiFilesDesc(loadFilesWork); + moveWork.setNeedCleanTarget(false); + } else { + LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table), + partSpec.getPartSpec(), + replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, + writeId); + loadTableWork.setStmtId(stmtId); + loadTableWork.setInheritTableSpecs(false); + moveWork.setLoadTableWork(loadTableWork); + } + + Task<?> loadPartTask = TaskFactory.get(moveWork, x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); x.getTasks().add(copyTask); @@ -1005,7 +1036,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { t.addDependentTask( addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); if (updatedMetadata != null) { - updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + updatedMetadata.addPartition(table.getDbName(), table.getTableName(), + addPartitionDesc.getPartition(0).getPartSpec()); } } } else { @@ -1057,13 +1089,15 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { x.getTasks().add(addSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); if (updatedMetadata != null) { - updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + updatedMetadata.addPartition(table.getDbName(), table.getTableName(), + addPartitionDesc.getPartition(0).getPartSpec()); } } else { x.getTasks().add(alterSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, null, x)); if (updatedMetadata != null) { - updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + updatedMetadata.addPartition(table.getDbName(), table.getTableName(), + addPartitionDesc.getPartition(0).getPartSpec()); } } } else { @@ -1078,7 +1112,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); } if (updatedMetadata != null) { - updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + updatedMetadata.addPartition(table.getDbName(), table.getTableName(), + addPartitionDesc.getPartition(0).getPartSpec()); } if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 576f337..1271799 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7309,7 +7309,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } try { if (ctx.getExplainConfig() != null) { - writeId = 0L; // For explain plan, txn won't be opened and doesn't make sense to allocate write id + writeId = null; // For explain plan, txn won't be opened and doesn't make sense to allocate write id } else { if (isMmTable) { writeId = txnMgr.getTableWriteId(dest_tab.getDbName(), dest_tab.getTableName()); @@ -7324,6 +7324,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { boolean isReplace = !qb.getParseInfo().isInsertIntoTable( dest_tab.getDbName(), dest_tab.getTableName()); ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, writeId); + if (writeId != null) { + ltd.setStmtId(txnMgr.getCurrentStmtId()); + } // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up boolean isInsertInto = qb.getParseInfo().isInsertIntoTable( @@ -7419,6 +7422,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { throw new SemanticException("Failed to allocate write Id", ex); } ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, writeId); + if (writeId != null) { + ltd.setStmtId(txnMgr.getCurrentStmtId()); + } // For the current context for generating File Sink Operator, it is either INSERT INTO or INSERT OVERWRITE. // So the next line works. boolean isInsertInto = !qb.getParseInfo().isDestToOpTypeInsertOverwrite(dest); http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index ce7e65a..8df2904 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -179,9 +179,23 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer { String newTableName = getTmptTableNameForExport(exportTable); //this is db.table Map<String, String> tblProps = new HashMap<>(); tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.FALSE.toString()); + String location; + + // for temporary tables we set the location to something in the session's scratch dir + // it has the same life cycle as the tmp table + try { + // Generate a unique ID for temp table path. + // This path will be fixed for the life of the temp table. + Path path = new Path(SessionState.getTempTableSpace(conf), UUID.randomUUID().toString()); + path = Warehouse.getDnsPath(path, conf); + location = path.toString(); + } catch (MetaException err) { + throw new SemanticException("Error while generating temp table path:", err); + } + CreateTableLikeDesc ctlt = new CreateTableLikeDesc(newTableName, false, true, null, - null, null, null, null, + null, location, null, null, tblProps, true, //important so we get an exception on name collision Warehouse.getQualifiedName(exportTable.getTTable()), false); http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 61bf6b9..75dcaa3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -400,7 +400,7 @@ public class CopyUtils { return result; } - private Path getCopyDestination(ReplChangeManager.FileInfo fileInfo, Path destRoot) { + public static Path getCopyDestination(ReplChangeManager.FileInfo fileInfo, Path destRoot) { if (fileInfo.getSubDir() == null) { return destRoot; } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index c0701c5..62d699f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -186,10 +186,6 @@ public class Utils { return false; } - boolean isAcidTable = AcidUtils.isTransactionalTable(tableHandle); - if (isAcidTable) { - return hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_INCLUDE_ACID_TABLES); - } return !tableHandle.isTemporary(); } return true; http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index db97d7c..f04cd93 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -18,9 +18,27 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.WriteEventInfo; +import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.fs.FileSystem; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.List; class CommitTxnHandler extends AbstractEventHandler { @@ -28,11 +46,116 @@ class CommitTxnHandler extends AbstractEventHandler { super(event); } + private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { + Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); + FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); + return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); + } + + private void writeDumpFiles(Context withinContext, Iterable<String> files, Path dataPath) throws IOException { + // encoded filename/checksum of files, write into _files + try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { + for (String file : files) { + fileListWriter.write(file + "\n"); + } + } + } + + private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable, + List<Partition> qlPtns, List<List<String>> fileListArray) throws IOException, SemanticException { + if (fileListArray == null || fileListArray.isEmpty()) { + return; + } + + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + withinContext.replicationSpec.setIsReplace(true); + EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath, + qlMdTable, qlPtns, + withinContext.replicationSpec, + withinContext.hiveConf); + + if ((null == qlPtns) || qlPtns.isEmpty()) { + Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); + writeDumpFiles(withinContext, fileListArray.get(0), dataPath); + } else { + for (int idx = 0; idx < qlPtns.size(); idx++) { + Path dataPath = new Path(withinContext.eventRoot, qlPtns.get(idx).getName()); + writeDumpFiles(withinContext, fileListArray.get(idx), dataPath); + } + } + } + + private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable, + List<Partition> qlPtns, List<List<String>> fileListArray) throws IOException, SemanticException { + Path newPath = HiveUtils.getDumpPath(withinContext.eventRoot, qlMdTable.getDbName(), qlMdTable.getTableName()); + Context context = new Context(withinContext); + context.setEventRoot(newPath); + createDumpFile(context, qlMdTable, qlPtns, fileListArray); + } + @Override public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), event.getMessage()); + String payload = event.getMessage(); + + if (!withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { + CommitTxnMessage commitTxnMessage = deserializer.getCommitTxnMessage(event.getMessage()); + + String contextDbName = withinContext.dbName == null ? null : + StringUtils.normalizeIdentifier(withinContext.dbName); + String contextTableName = withinContext.tableName == null ? null : + StringUtils.normalizeIdentifier(withinContext.tableName); + List<WriteEventInfo> writeEventInfoList = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf). + getAllWriteEventInfo(commitTxnMessage.getTxnId(), contextDbName, contextTableName); + int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0); + if (numEntry != 0) { + commitTxnMessage.addWriteEventInfo(writeEventInfoList); + payload = commitTxnMessage.toString(); + LOG.debug("payload for commit txn event : " + payload); + } + + org.apache.hadoop.hive.ql.metadata.Table qlMdTablePrev = null; + org.apache.hadoop.hive.ql.metadata.Table qlMdTable = null; + List<Partition> qlPtns = new ArrayList<>(); + List<List<String>> filesTobeAdded = new ArrayList<>(); + + // The below loop creates dump directory for each table. It reads through the list of write notification events, + // groups the entries per table and creates the lists of files to be replicated. The event directory in the dump + // path will have subdirectory for each table. This folder will have metadata for the table and the list of files + // to be replicated. The entries are added in the table with txn id, db name,table name, partition name + // combination as primary key, so the entries with same table will come together. Only basic table metadata is + // used during import, so we need not dump the latest table metadata. + for (int idx = 0; idx < numEntry; idx++) { + qlMdTable = new org.apache.hadoop.hive.ql.metadata.Table(commitTxnMessage.getTableObj(idx)); + if (qlMdTablePrev == null) { + qlMdTablePrev = qlMdTable; + } + + // one dump directory per table + if (!qlMdTablePrev.getCompleteName().equals(qlMdTable.getCompleteName())) { + createDumpFileForTable(withinContext, qlMdTablePrev, qlPtns, filesTobeAdded); + qlPtns = new ArrayList<>(); + filesTobeAdded = new ArrayList<>(); + qlMdTablePrev = qlMdTable; + } + + if (qlMdTable.isPartitioned() && (null != commitTxnMessage.getPartitionObj(idx))) { + qlPtns.add(new org.apache.hadoop.hive.ql.metadata.Partition(qlMdTable, + commitTxnMessage.getPartitionObj(idx))); + } + + filesTobeAdded.add(Lists.newArrayList( + ReplChangeManager.getListFromSeparatedString(commitTxnMessage.getFiles(idx)))); + } + + //Dump last table in the list + if (qlMdTablePrev != null) { + createDumpFileForTable(withinContext, qlMdTablePrev, qlPtns, filesTobeAdded); + } + } + DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); + dmd.setPayload(payload); dmd.write(); } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index c0fa7b2..ec35f4e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -35,18 +35,37 @@ public interface EventHandler { DumpType dumpType(); class Context { - final Path eventRoot, cmRoot; + Path eventRoot; + final Path cmRoot; final Hive db; final HiveConf hiveConf; final ReplicationSpec replicationSpec; + final String dbName; + final String tableName; public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, - ReplicationSpec replicationSpec) { + ReplicationSpec replicationSpec, String dbName, String tableName) { this.eventRoot = eventRoot; this.cmRoot = cmRoot; this.db = db; this.hiveConf = hiveConf; this.replicationSpec = replicationSpec; + this.dbName = dbName; + this.tableName = tableName; + } + + public Context(Context other) { + this.eventRoot = other.eventRoot; + this.cmRoot = other.cmRoot; + this.db = other.db; + this.hiveConf = other.hiveConf; + this.replicationSpec = other.replicationSpec; + this.dbName = other.dbName; + this.tableName = other.tableName; + } + + public void setEventRoot(Path eventRoot) { + this.eventRoot = eventRoot; } DumpMetaData createDmd(EventHandler eventHandler) { http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 5ac3af0..cf3822a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -53,6 +54,9 @@ class InsertHandler extends AbstractEventHandler { return; } + // In case of ACID tables, insert event should not have fired. + assert(!AcidUtils.isTransactionalTable(qlMdTable)); + List<Partition> qlPtns = null; if (qlMdTable.isPartitioned() && (null != insertMsg.getPtnObj())) { qlPtns = Collections.singletonList(partitionObject(qlMdTable, insertMsg)); http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java index d76f419..614e071 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java @@ -17,7 +17,10 @@ */ package org.apache.hadoop.hive.ql.parse.repl.load; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hive.common.util.HiveStringUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.Map; import java.util.List; @@ -25,52 +28,113 @@ import java.util.List; * Utility class to help track and return the metadata which are updated by repl load */ public class UpdatedMetaDataTracker { - private String replState; - private String dbName; - private String tableName; - private List<Map <String, String>> partitionsList; - public UpdatedMetaDataTracker() { - this.replState = null; - this.dbName = null; - this.tableName = null; - this.partitionsList = new ArrayList<>(); + /** + * Utility class to store replication state of a table. + */ + public static class UpdateMetaData { + private String replState; + private String dbName; + private String tableName; + private List<Map <String, String>> partitionsList; + + UpdateMetaData(String replState, String dbName, String tableName, Map <String, String> partSpec) { + this.replState = replState; + this.dbName = dbName; + this.tableName = tableName; + this.partitionsList = new ArrayList<>(); + if (partSpec != null) { + this.partitionsList.add(partSpec); + } + } + + public String getReplState() { + return replState; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + public List<Map <String, String>> getPartitionsList() { + return partitionsList; + } + + public void addPartition(Map<String, String> partSpec) { + this.partitionsList.add(partSpec); + } } - public void copyUpdatedMetadata(UpdatedMetaDataTracker other) { - this.replState = other.replState; - this.dbName = other.dbName; - this.tableName = other.tableName; - this.partitionsList = other.getPartitions(); + private List<UpdateMetaData> updateMetaDataList; + private Map<String, Integer> updateMetaDataMap; + + public UpdatedMetaDataTracker() { + updateMetaDataList = new ArrayList<>(); + updateMetaDataMap = new HashMap<>(); } - public void set(String replState, String dbName, String tableName, Map <String, String> partSpec) { - this.replState = replState; - this.dbName = dbName; - this.tableName = tableName; - if (partSpec != null) { - addPartition(partSpec); + public void copyUpdatedMetadata(UpdatedMetaDataTracker other) { + int size = updateMetaDataList.size(); + for (UpdateMetaData updateMetaDataOther : other.updateMetaDataList) { + String key = getKey(normalizeIdentifier(updateMetaDataOther.getDbName()), + normalizeIdentifier(updateMetaDataOther.getTableName())); + Integer idx = updateMetaDataMap.get(key); + if (idx == null) { + updateMetaDataList.add(updateMetaDataOther); + updateMetaDataMap.put(key, size++); + } else if (updateMetaDataOther.partitionsList != null && updateMetaDataOther.partitionsList.size() != 0) { + UpdateMetaData updateMetaData = updateMetaDataList.get(idx); + for (Map<String, String> partSpec : updateMetaDataOther.partitionsList) { + updateMetaData.addPartition(partSpec); + } + } } } - public void addPartition(Map <String, String> partSpec) { - partitionsList.add(partSpec); + public void set(String replState, String dbName, String tableName, Map <String, String> partSpec) + throws SemanticException { + if (dbName == null) { + throw new SemanticException("db name can not be null"); + } + String key = getKey(normalizeIdentifier(dbName), normalizeIdentifier(tableName)); + Integer idx = updateMetaDataMap.get(key); + if (idx == null) { + updateMetaDataList.add(new UpdateMetaData(replState, dbName, tableName, partSpec)); + updateMetaDataMap.put(key, updateMetaDataList.size() - 1); + } else { + updateMetaDataList.get(idx).addPartition(partSpec); + } } - public String getReplicationState() { - return replState; + public void addPartition(String dbName, String tableName, Map <String, String> partSpec) throws SemanticException { + if (dbName == null) { + throw new SemanticException("db name can not be null"); + } + String key = getKey(normalizeIdentifier(dbName), normalizeIdentifier(tableName)); + Integer idx = updateMetaDataMap.get(key); + if (idx == null) { + throw new SemanticException("add partition to metadata map failed as list is not yet set for table : " + key); + } + updateMetaDataList.get(idx).addPartition(partSpec); } - public String getDatabase() { - return dbName; + public List<UpdateMetaData> getUpdateMetaDataList() { + return updateMetaDataList; } - public String getTable() { - return tableName; + private String getKey(String dbName, String tableName) { + if (tableName == null) { + return dbName + ".*"; + } + return dbName + "." + tableName; } - public List<Map <String, String>> getPartitions() { - return partitionsList; + private String normalizeIdentifier(String name) { + return name == null ? null : HiveStringUtils.normalizeIdentifier(name); } } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java index afc7426..d3f3306 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java @@ -48,7 +48,12 @@ public class AbortTxnHandler extends AbstractMessageHandler { msg.getTxnId(), ReplTxnWork.OperationType.REPL_ABORT_TXN, context.eventOnlyReplicationSpec()), context.hiveConf ); - updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + + // For warehouse level dump, don't update the metadata of database as we don't know this txn is for which database. + // Anyways, if this event gets executed again, it is taken care of. + if (!context.isDbNameEmpty()) { + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + } context.log.debug("Added Abort txn task : {}", abortTxnTask.getId()); return Collections.singletonList(abortTxnTask); } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java index 9bdbf64..63f2577 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java @@ -52,7 +52,7 @@ public class AllocWriteIdHandler extends AbstractMessageHandler { .getTableName()); // Repl policy should be created based on the table name in context. - ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(dbName, context.tableName), dbName, tableName, + ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), dbName, tableName, ReplTxnWork.OperationType.REPL_ALLOC_WRITE_ID, msg.getTxnToWriteIdList(), context.eventOnlyReplicationSpec()); Task<? extends Serializable> allocWriteIdTask = TaskFactory.get(work, context.hiveConf); http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java index d25102e..0619bd3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java @@ -17,7 +17,12 @@ */ package org.apache.hadoop.hive.ql.parse.repl.load.message; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; +import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.ReplTxnWork; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -25,7 +30,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import java.io.Serializable; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; /** @@ -35,20 +40,75 @@ import java.util.List; public class CommitTxnHandler extends AbstractMessageHandler { @Override public List<Task<? extends Serializable>> handle(Context context) - throws SemanticException { + throws SemanticException { if (!AcidUtils.isAcidEnabled(context.hiveConf)) { context.log.error("Cannot load transaction events as acid is not enabled"); throw new SemanticException("Cannot load transaction events as acid is not enabled"); } CommitTxnMessage msg = deserializer.getCommitTxnMessage(context.dmd.getPayload()); - Task<ReplTxnWork> commitTxnTask = TaskFactory.get( - new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName, context.tableName, - msg.getTxnId(), ReplTxnWork.OperationType.REPL_COMMIT_TXN, context.eventOnlyReplicationSpec()), - context.hiveConf - ); - updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + int numEntry = (msg.getTables() == null ? 0 : msg.getTables().size()); + List<Task<? extends Serializable>> tasks = new ArrayList<>(); + String dbName = context.dbName; + String tableNamePrev = null; + String tblName = context.tableName; + + ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName, + context.tableName, msg.getTxnId(), ReplTxnWork.OperationType.REPL_COMMIT_TXN, context.eventOnlyReplicationSpec()); + + if (numEntry > 0) { + context.log.debug("Commit txn handler for txnid " + msg.getTxnId() + " databases : " + msg.getDatabases() + + " tables : " + msg.getTables() + " partitions : " + msg.getPartitions() + " files : " + + msg.getFilesList() + " write ids : " + msg.getWriteIds()); + } + + for (int idx = 0; idx < numEntry; idx++) { + String actualTblName = msg.getTables().get(idx); + String actualDBName = msg.getDatabases().get(idx); + String completeName = Table.getCompleteName(actualDBName, actualTblName); + + // One import task per table. Events for same table are kept together in one dump directory during dump and are + // grouped together in commit txn message. + if (tableNamePrev == null || !(completeName.equals(tableNamePrev))) { + // The data location is created by source, so the location should be formed based on the table name in msg. + Path location = HiveUtils.getDumpPath(new Path(context.location), actualDBName, actualTblName); + tblName = context.isTableNameEmpty() ? actualTblName : context.tableName; + // for warehouse level dump, use db name from write event + dbName = (context.isDbNameEmpty() ? actualDBName : context.dbName); + Context currentContext = new Context(context, dbName, tblName); + currentContext.setLocation(location.toUri().toString()); + + // Piggybacking in Import logic for now + TableHandler tableHandler = new TableHandler(); + tasks.addAll((tableHandler.handle(currentContext))); + readEntitySet.addAll(tableHandler.readEntities()); + writeEntitySet.addAll(tableHandler.writeEntities()); + getUpdatedMetadata().copyUpdatedMetadata(tableHandler.getUpdatedMetadata()); + tableNamePrev = completeName; + } + + try { + WriteEventInfo writeEventInfo = new WriteEventInfo(msg.getWriteIds().get(idx), + dbName, tblName, msg.getFiles(idx)); + if (msg.getPartitions().get(idx) != null && !msg.getPartitions().get(idx).isEmpty()) { + writeEventInfo.setPartition(msg.getPartitions().get(idx)); + } + work.addWriteEventInfo(writeEventInfo); + } catch (Exception e) { + throw new SemanticException("Failed to extract write event info from commit txn message : " + e.getMessage()); + } + } + + Task<ReplTxnWork> commitTxnTask = TaskFactory.get(work, context.hiveConf); + + // For warehouse level dump, don't update the metadata of database as we don't know this txn is for which database. + // Anyways, if this event gets executed again, it is taken care of. + if (!context.isDbNameEmpty()) { + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + } context.log.debug("Added Commit txn task : {}", commitTxnTask.getId()); - return Collections.singletonList(commitTxnTask); + DAGTraversal.traverse(tasks, new AddDependencyToLeaves(commitTxnTask)); + return tasks; } } + http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java index ef4a901..cdf51dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java @@ -46,8 +46,8 @@ public interface MessageHandler { UpdatedMetaDataTracker getUpdatedMetadata(); class Context { - public String dbName; - public final String tableName, location; + public String location; + public final String tableName, dbName; public final Task<? extends Serializable> precursor; public DumpMetaData dmd; final HiveConf hiveConf; @@ -101,5 +101,9 @@ public interface MessageHandler { public HiveTxnManager getTxnMgr() { return nestedContext.getHiveTxnManager(); } + + public void setLocation(String location) { + this.location = location; + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java index 190e021..5dcc44e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java @@ -47,7 +47,12 @@ public class OpenTxnHandler extends AbstractMessageHandler { msg.getTxnIds(), ReplTxnWork.OperationType.REPL_OPEN_TXN, context.eventOnlyReplicationSpec()), context.hiveConf ); - updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + + // For warehouse level dump, don't update the metadata of database as we don't know this txn is for which database. + // Anyways, if this event gets executed again, it is taken care of. + if (!context.isDbNameEmpty()) { + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + } context.log.debug("Added Open txn task : {}", openTxnTask.getId()); return Collections.singletonList(openTxnTask); } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 9a1e3a1..47a56d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -40,6 +40,7 @@ public class MoveWork implements Serializable { private LoadMultiFilesDesc loadMultiFilesWork; private boolean checkFileFormat; private boolean srcLocal; + private boolean needCleanTarget; /** * ReadEntitites that are passed to the hooks. @@ -63,6 +64,7 @@ public class MoveWork implements Serializable { private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) { this.inputs = inputs; this.outputs = outputs; + this.needCleanTarget = true; } public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs, @@ -93,6 +95,7 @@ public class MoveWork implements Serializable { srcLocal = o.isSrcLocal(); inputs = o.getInputs(); outputs = o.getOutputs(); + needCleanTarget = o.needCleanTarget; } @Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -153,5 +156,12 @@ public class MoveWork implements Serializable { public void setSrcLocal(boolean srcLocal) { this.srcLocal = srcLocal; } - + + public boolean isNeedCleanTarget() { + return needCleanTarget; + } + + public void setNeedCleanTarget(boolean needCleanTarget) { + this.needCleanTarget = needCleanTarget; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java index 3c853c9..a6ab836 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java @@ -20,8 +20,10 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.Explain.Level; +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -40,6 +42,7 @@ public class ReplTxnWork implements Serializable { private List<Long> txnIds; private List<TxnToWriteId> txnToWriteIdList; private ReplicationSpec replicationSpec; + private List<WriteEventInfo> writeEventInfos; /** * OperationType. @@ -60,6 +63,7 @@ public class ReplTxnWork implements Serializable { this.replPolicy = replPolicy; this.txnToWriteIdList = txnToWriteIdList; this.replicationSpec = replicationSpec; + this.writeEventInfos = null; } public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type, @@ -86,6 +90,13 @@ public class ReplTxnWork implements Serializable { this.operation = type; } + public void addWriteEventInfo(WriteEventInfo writeEventInfo) { + if (this.writeEventInfos == null) { + this.writeEventInfos = new ArrayList<>(); + } + this.writeEventInfos.add(writeEventInfo); + } + public List<Long> getTxnIds() { return txnIds; } @@ -121,4 +132,8 @@ public class ReplTxnWork implements Serializable { public ReplicationSpec getReplicationSpec() { return replicationSpec; } + + public List<WriteEventInfo> getWriteEventInfos() { + return writeEventInfos; + } }