[ 
https://issues.apache.org/jira/browse/HIVE-24783?focusedWorklogId=558231&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-558231
 ]

ASF GitHub Bot logged work on HIVE-24783:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Feb/21 21:51
            Start Date: 25/Feb/21 21:51
    Worklog Time Spent: 10m 
      Work Description: pkumarsinha commented on a change in pull request #2005:
URL: https://github.com/apache/hive/pull/2005#discussion_r583218805



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope 
replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && 
!work.incrementalLoadTasksBuilder().hasMoreWork() && 
!work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? 
!work.incrementalLoadTasksBuilder().hasMoreWork() : true)){
       //All repl load tasks are executed and status is 0, create the task to 
add the acknowledgement
+      List<Runnable> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new Runnable() {
+        @Override
+        public void run() {
+          try{
+            HiveMetaStoreClient metaStoreClient = new 
HiveMetaStoreClient(conf);
+            long currentNotificationID = 
metaStoreClient.getCurrentNotificationEventId().getEventId();
+            Path notificationFilePath = new Path(work.dumpDirectory, 
LOAD_METADATA.toString());
+            Utils.writeOutput(String.valueOf(currentNotificationID), 
notificationFilePath, conf);
+            LOG.info("Created NotificationACK file : {} with NotificationID : 
{}", notificationFilePath, currentNotificationID);

Review comment:
       NotificationACK file  -> Load metadata file

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +504,25 @@ private void dropTablesExcludedInReplScope(ReplScope 
replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && 
!work.incrementalLoadTasksBuilder().hasMoreWork() && 
!work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? 
!work.incrementalLoadTasksBuilder().hasMoreWork() : true)){
       //All repl load tasks are executed and status is 0, create the task to 
add the acknowledgement
+      List<Runnable> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new Runnable() {
+        @Override
+        public void run() {
+          try{
+            HiveMetaStoreClient metaStoreClient = new 
HiveMetaStoreClient(conf);
+            long currentNotificationID = 
metaStoreClient.getCurrentNotificationEventId().getEventId();
+            Path notificationFilePath = new Path(work.dumpDirectory, 
NOTIFICATION_FILE.toString());
+            Utils.writeOutput(String.valueOf(currentNotificationID), 
notificationFilePath, conf);
+            LOG.info("Created NotificationACK file : {} with NotificationID : 
{}", notificationFilePath, currentNotificationID);
+          }catch (Exception e) {
+            e.printStackTrace();

Review comment:
       Use a varaint of RuntimeException and throw it back

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
##########
@@ -414,6 +418,24 @@ WarehouseInstance verifyResults(List data) throws 
IOException {
     return this;
   }
 
+  long verifyNotificationAck(String dumpLocation, long prevNotificationID) 
throws Exception {

Review comment:
       Move it to TestReplicationScenariosAcidTables

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope 
replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && 
!work.incrementalLoadTasksBuilder().hasMoreWork() && 
!work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? 
!work.incrementalLoadTasksBuilder().hasMoreWork() : true)){

Review comment:
       Few lines like this are crossing the  default max length(120 I thinl) 
for check-style. You may want to format them

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
##########
@@ -414,6 +418,24 @@ WarehouseInstance verifyResults(List data) throws 
IOException {
     return this;
   }
 
+  long verifyNotificationAck(String dumpLocation, long prevNotificationID) 
throws Exception {
+    FileSystem fs = new Path(dumpLocation).getFileSystem(hiveConf);
+    Path notificationAckFile = new Path(dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR + "/" + ReplAck.NOTIFICATION_FILE);
+    assertTrue(fs.exists(notificationAckFile));
+    long currentNotificationID = getCurrentNotificationEventId().getEventId();
+    long previousLoadNotificationID = 
fetchNotificationIDFromDump(notificationAckFile, fs);
+    assertTrue(previousLoadNotificationID > prevNotificationID && 
currentNotificationID > previousLoadNotificationID);
+    return previousLoadNotificationID;
+  }
+
+  long fetchNotificationIDFromDump(Path notificationAckFile, FileSystem fs) 
throws Exception{
+    InputStream inputstream = fs.open(notificationAckFile);
+    BufferedReader reader = new BufferedReader(new 
InputStreamReader(inputstream));
+    String line = reader.readLine();
+    assertTrue(line!=null && reader.readLine()==null);
+    return Long.parseLong(line);

Review comment:
       close the reader/stream

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -500,11 +503,25 @@ private void dropTablesExcludedInReplScope(ReplScope 
replScope) throws HiveExcep
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && 
!work.incrementalLoadTasksBuilder().hasMoreWork() && 
!work.hasBootstrapLoadTasks())
-        || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+    if (!work.hasBootstrapLoadTasks() && (work.isIncrementalLoad() ? 
!work.incrementalLoadTasksBuilder().hasMoreWork() : true)){
       //All repl load tasks are executed and status is 0, create the task to 
add the acknowledgement
+      List<Runnable> listOfPreAckTasks = new LinkedList<>();
+      listOfPreAckTasks.add(new Runnable() {
+        @Override
+        public void run() {
+          try{
+            HiveMetaStoreClient metaStoreClient = new 
HiveMetaStoreClient(conf);
+            long currentNotificationID = 
metaStoreClient.getCurrentNotificationEventId().getEventId();
+            Path notificationFilePath = new Path(work.dumpDirectory, 
LOAD_METADATA.toString());
+            Utils.writeOutput(String.valueOf(currentNotificationID), 
notificationFilePath, conf);
+            LOG.info("Created NotificationACK file : {} with NotificationID : 
{}", notificationFilePath, currentNotificationID);
+          }catch (Exception e) {
+            throw new RuntimeException(e);

Review comment:
       Does it become non-recoverable error or recoverable error?

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -51,8 +51,12 @@
 
 import javax.annotation.Nullable;
 
+import java.io.Closeable;

Review comment:
       Remove unused import

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckWork.java
##########
@@ -35,6 +36,7 @@
   private static final long serialVersionUID = 1L;
   private Path ackFilePath;
   private transient ReplicationMetricCollector metricCollector;
+  private List<Runnable> tasks;

Review comment:
       nit: How about renaming to preAckTasks to avoid confusion?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 558231)
    Time Spent: 1h  (was: 50m)

> Store currentNotificationID on target during repl load operation
> ----------------------------------------------------------------
>
>                 Key: HIVE-24783
>                 URL: https://issues.apache.org/jira/browse/HIVE-24783
>             Project: Hive
>          Issue Type: Bug
>          Components: Hive
>            Reporter: Haymant Mangla
>            Assignee: Haymant Mangla
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to