Apache9 commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r3270997641


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -134,6 +186,15 @@ public final void run() {
   }
 
   private void noMoreData() {
+    try {
+      // Flush any delayed offset before finishing queue
+      persistLogPosition();
+    } catch (IOException e) {
+      LOG.error("Failed to persist final replication offset for 
walGroupId={}", walGroupId, e);
+      abortAndRestart(e);

Review Comment:
   IIRC I've mentioned somewhere in this PR, we'd better throw the exception to 
the top layer and then decide what to do, I'm afraid we call abortAndRestart 
here but in the upper layer we still do some other things which mess up the 
state and here we just return normally...



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -90,38 +103,77 @@ public ReplicationSourceShipper(Configuration conf, String 
walGroupId, Replicati
       this.conf.getInt("replication.source.getEntries.timeout", 
DEFAULT_TIMEOUT);
     this.shipEditsTimeout = 
this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
       HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
+    this.offsetUpdateIntervalMs =
+      conf.getLong(OFFSET_UPDATE_INTERVAL_MS_KEY, 
DEFAULT_OFFSET_UPDATE_INTERVAL_MS);
+    this.offsetUpdateSizeThresholdBytes =
+      conf.getLong(OFFSET_UPDATE_SIZE_THRESHOLD_KEY, 
DEFAULT_OFFSET_UPDATE_SIZE_THRESHOLD);
   }
 
   @Override
   public final void run() {
     setWorkerState(WorkerState.RUNNING);
     LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", 
this.walGroupId);
-    // Loop until we close down
-    while (isActive()) {
-      // Sleep until replication is enabled again
-      if (!source.isPeerEnabled()) {
-        // The peer enabled check is in memory, not expensive, so do not need 
to increase the
-        // sleep interval as it may cause a long lag when we enable the peer.
-        sleepForRetries("Replication is disabled", sleepForRetries, 1, 
maxRetriesMultiplier);
-        continue;
-      }
-      try {
-        WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
-        LOG.debug("Shipper from source {} got entry batch from reader: {}", 
source.getQueueId(),
-          entryBatch);
-        if (entryBatch == null) {
+    try {
+      // Loop until we close down
+      while (isActive()) {
+        // Sleep until replication is enabled again
+        if (!source.isPeerEnabled()) {
+          // The peer enabled check is in memory, not expensive, so do not 
need to increase the
+          // sleep interval as it may cause a long lag when we enable the peer.
+          sleepForRetries("Replication is disabled", sleepForRetries, 1, 
maxRetriesMultiplier);
           continue;
         }
-        // the NO_MORE_DATA instance has no path so do not call shipEdits
-        if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
-          noMoreData();
-        } else {
-          shipEdits(entryBatch);
+        try {
+          // check time-based offset persistence
+          if (shouldPersistLogPosition()) {
+            persistLogPosition();
+          }
+
+          long pollTimeout = getEntriesTimeout;
+          if (offsetUpdateIntervalMs != Long.MAX_VALUE) {
+            long elapsed = EnvironmentEdgeManager.currentTime() - 
lastOffsetUpdateTime;
+            long remaining = offsetUpdateIntervalMs - elapsed;
+            if (remaining > 0) {
+              pollTimeout = Math.min(getEntriesTimeout, remaining);
+            }
+          }
+          WALEntryBatch entryBatch = entryReader.poll(pollTimeout);
+          LOG.debug("Shipper from source {} got entry batch from reader: {}", 
source.getQueueId(),
+            entryBatch);
+
+          if (entryBatch == null) {
+            continue;
+          }
+          // the NO_MORE_DATA instance has no path so do not call shipEdits
+          if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
+            noMoreData();
+          } else {
+            shipEdits(entryBatch);
+          }
+        } catch (InterruptedException | ReplicationRuntimeException e) {
+          LOG.warn("Interrupted while waiting for next replication entry 
batch", e);
+          Thread.currentThread().interrupt();
+        } catch (IOException ioe) {
+          // During source shutdown / peer removal we can see interrupted IOEs
+          // from replication queue updates. Do not restart in this case.
+          if (!source.isSourceActive() || isInterrupted() || 
!source.isPeerEnabled()) {
+            LOG.info("Ignoring persist failure during shutdown for 
walGroupId={}", walGroupId, ioe);
+            break;
+          }
+          LOG.error("Shipper {} failed to persist offset, restarting", 
walGroupId, ioe);
+          abortAndRestart(ioe);
+          break;
+        }
+      }
+    } finally {
+      // Flush buffered offsets before worker exits
+      if (!isFinished() && !abortingForRestart) {
+        try {
+          persistLogPosition();
+        } catch (IOException e) {
+          LOG.warn("Failed persisting final offset during shutdown for 
walGroupId={}", walGroupId,

Review Comment:
   Please add comments to exlain why here we do not need to deal with this 
exception?



##########
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceShipperRestart.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.InOrder;
+
+@Category({ ReplicationTests.class, SmallTests.class })

Review Comment:
   Please use junit5



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -90,38 +103,77 @@ public ReplicationSourceShipper(Configuration conf, String 
walGroupId, Replicati
       this.conf.getInt("replication.source.getEntries.timeout", 
DEFAULT_TIMEOUT);
     this.shipEditsTimeout = 
this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
       HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
+    this.offsetUpdateIntervalMs =
+      conf.getLong(OFFSET_UPDATE_INTERVAL_MS_KEY, 
DEFAULT_OFFSET_UPDATE_INTERVAL_MS);
+    this.offsetUpdateSizeThresholdBytes =
+      conf.getLong(OFFSET_UPDATE_SIZE_THRESHOLD_KEY, 
DEFAULT_OFFSET_UPDATE_SIZE_THRESHOLD);
   }
 
   @Override
   public final void run() {
     setWorkerState(WorkerState.RUNNING);
     LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", 
this.walGroupId);
-    // Loop until we close down
-    while (isActive()) {
-      // Sleep until replication is enabled again
-      if (!source.isPeerEnabled()) {
-        // The peer enabled check is in memory, not expensive, so do not need 
to increase the
-        // sleep interval as it may cause a long lag when we enable the peer.
-        sleepForRetries("Replication is disabled", sleepForRetries, 1, 
maxRetriesMultiplier);
-        continue;
-      }
-      try {
-        WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
-        LOG.debug("Shipper from source {} got entry batch from reader: {}", 
source.getQueueId(),
-          entryBatch);
-        if (entryBatch == null) {
+    try {
+      // Loop until we close down
+      while (isActive()) {
+        // Sleep until replication is enabled again
+        if (!source.isPeerEnabled()) {
+          // The peer enabled check is in memory, not expensive, so do not 
need to increase the
+          // sleep interval as it may cause a long lag when we enable the peer.
+          sleepForRetries("Replication is disabled", sleepForRetries, 1, 
maxRetriesMultiplier);
           continue;
         }
-        // the NO_MORE_DATA instance has no path so do not call shipEdits
-        if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
-          noMoreData();
-        } else {
-          shipEdits(entryBatch);
+        try {
+          // check time-based offset persistence
+          if (shouldPersistLogPosition()) {
+            persistLogPosition();
+          }
+
+          long pollTimeout = getEntriesTimeout;
+          if (offsetUpdateIntervalMs != Long.MAX_VALUE) {
+            long elapsed = EnvironmentEdgeManager.currentTime() - 
lastOffsetUpdateTime;
+            long remaining = offsetUpdateIntervalMs - elapsed;
+            if (remaining > 0) {
+              pollTimeout = Math.min(getEntriesTimeout, remaining);
+            }
+          }
+          WALEntryBatch entryBatch = entryReader.poll(pollTimeout);
+          LOG.debug("Shipper from source {} got entry batch from reader: {}", 
source.getQueueId(),
+            entryBatch);
+
+          if (entryBatch == null) {
+            continue;
+          }
+          // the NO_MORE_DATA instance has no path so do not call shipEdits
+          if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
+            noMoreData();
+          } else {
+            shipEdits(entryBatch);
+          }
+        } catch (InterruptedException | ReplicationRuntimeException e) {
+          LOG.warn("Interrupted while waiting for next replication entry 
batch", e);
+          Thread.currentThread().interrupt();
+        } catch (IOException ioe) {
+          // During source shutdown / peer removal we can see interrupted IOEs
+          // from replication queue updates. Do not restart in this case.
+          if (!source.isSourceActive() || isInterrupted() || 
!source.isPeerEnabled()) {
+            LOG.info("Ignoring persist failure during shutdown for 
walGroupId={}", walGroupId, ioe);
+            break;
+          }
+          LOG.error("Shipper {} failed to persist offset, restarting", 
walGroupId, ioe);
+          abortAndRestart(ioe);
+          break;
+        }
+      }
+    } finally {
+      // Flush buffered offsets before worker exits
+      if (!isFinished() && !abortingForRestart) {

Review Comment:
   How can we enter here? When refreshing peer config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to