[ https://issues.apache.org/jira/browse/HIVE-24783?focusedWorklogId=558231&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-558231 ]
ASF GitHub Bot logged work on HIVE-24783: ----------------------------------------- Author: ASF GitHub Bot Created on: 25/Feb/21 21:51 Start Date: 25/Feb/21 21:51 Worklog Time Spent: 10m Work Description: pkumarsinha commented on a change in pull request #2005: URL: https://github.com/apache/hive/pull/2005#discussion_r583218805 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java ########## @@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep } private void createReplLoadCompleteAckTask() { - if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks()) - || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) { + if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? !work.incrementalLoadTasksBuilder().hasMoreWork() : true)){ //All repl load tasks are executed and status is 0, create the task to add the acknowledgement + List<Runnable> listOfPreAckTasks = new LinkedList<>(); + listOfPreAckTasks.add(new Runnable() { + @Override + public void run() { + try{ + HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + long currentNotificationID = metaStoreClient.getCurrentNotificationEventId().getEventId(); + Path notificationFilePath = new Path(work.dumpDirectory, LOAD_METADATA.toString()); + Utils.writeOutput(String.valueOf(currentNotificationID), notificationFilePath, conf); + LOG.info("Created NotificationACK file : {} with NotificationID : {}", notificationFilePath, currentNotificationID); Review comment: NotificationACK file -> Load metadata file ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java ########## @@ -500,11 +504,25 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep } private void createReplLoadCompleteAckTask() { - if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks()) - || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) { + if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? !work.incrementalLoadTasksBuilder().hasMoreWork() : true)){ //All repl load tasks are executed and status is 0, create the task to add the acknowledgement + List<Runnable> listOfPreAckTasks = new LinkedList<>(); + listOfPreAckTasks.add(new Runnable() { + @Override + public void run() { + try{ + HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + long currentNotificationID = metaStoreClient.getCurrentNotificationEventId().getEventId(); + Path notificationFilePath = new Path(work.dumpDirectory, NOTIFICATION_FILE.toString()); + Utils.writeOutput(String.valueOf(currentNotificationID), notificationFilePath, conf); + LOG.info("Created NotificationACK file : {} with NotificationID : {}", notificationFilePath, currentNotificationID); + }catch (Exception e) { + e.printStackTrace(); Review comment: Use a varaint of RuntimeException and throw it back ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java ########## @@ -414,6 +418,24 @@ WarehouseInstance verifyResults(List data) throws IOException { return this; } + long verifyNotificationAck(String dumpLocation, long prevNotificationID) throws Exception { Review comment: Move it to TestReplicationScenariosAcidTables ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java ########## @@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep } private void createReplLoadCompleteAckTask() { - if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks()) - || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) { + if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? !work.incrementalLoadTasksBuilder().hasMoreWork() : true)){ Review comment: Few lines like this are crossing the default max length(120 I thinl) for check-style. You may want to format them ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java ########## @@ -414,6 +418,24 @@ WarehouseInstance verifyResults(List data) throws IOException { return this; } + long verifyNotificationAck(String dumpLocation, long prevNotificationID) throws Exception { + FileSystem fs = new Path(dumpLocation).getFileSystem(hiveConf); + Path notificationAckFile = new Path(dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR + "/" + ReplAck.NOTIFICATION_FILE); + assertTrue(fs.exists(notificationAckFile)); + long currentNotificationID = getCurrentNotificationEventId().getEventId(); + long previousLoadNotificationID = fetchNotificationIDFromDump(notificationAckFile, fs); + assertTrue(previousLoadNotificationID > prevNotificationID && currentNotificationID > previousLoadNotificationID); + return previousLoadNotificationID; + } + + long fetchNotificationIDFromDump(Path notificationAckFile, FileSystem fs) throws Exception{ + InputStream inputstream = fs.open(notificationAckFile); + BufferedReader reader = new BufferedReader(new InputStreamReader(inputstream)); + String line = reader.readLine(); + assertTrue(line!=null && reader.readLine()==null); + return Long.parseLong(line); Review comment: close the reader/stream ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java ########## @@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep } private void createReplLoadCompleteAckTask() { - if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks()) - || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) { + if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? !work.incrementalLoadTasksBuilder().hasMoreWork() : true)){ //All repl load tasks are executed and status is 0, create the task to add the acknowledgement + List<Runnable> listOfPreAckTasks = new LinkedList<>(); + listOfPreAckTasks.add(new Runnable() { + @Override + public void run() { + try{ + HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + long currentNotificationID = metaStoreClient.getCurrentNotificationEventId().getEventId(); + Path notificationFilePath = new Path(work.dumpDirectory, LOAD_METADATA.toString()); + Utils.writeOutput(String.valueOf(currentNotificationID), notificationFilePath, conf); + LOG.info("Created NotificationACK file : {} with NotificationID : {}", notificationFilePath, currentNotificationID); + }catch (Exception e) { + throw new RuntimeException(e); Review comment: Does it become non-recoverable error or recoverable error? ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ########## @@ -51,8 +51,12 @@ import javax.annotation.Nullable; +import java.io.Closeable; Review comment: Remove unused import ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckWork.java ########## @@ -35,6 +36,7 @@ private static final long serialVersionUID = 1L; private Path ackFilePath; private transient ReplicationMetricCollector metricCollector; + private List<Runnable> tasks; Review comment: nit: How about renaming to preAckTasks to avoid confusion? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 558231) Time Spent: 1h (was: 50m) > Store currentNotificationID on target during repl load operation > ---------------------------------------------------------------- > > Key: HIVE-24783 > URL: https://issues.apache.org/jira/browse/HIVE-24783 > Project: Hive > Issue Type: Bug > Components: Hive > Reporter: Haymant Mangla > Assignee: Haymant Mangla > Priority: Major > Labels: pull-request-available > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)