Apache9 commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r3044610064
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
Review Comment:
What I can recall is that for some reasons like updating replication config,
we may terminate a replication source and recreate it before the replication
source finishes, so maybe we do not always need to restart the worker when
entering here, but I'm still a bit nervous that we miss some scenarios where we
need to restart the worker if we only call abortAndRestart when catching an
exception...
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -215,6 +277,12 @@ private void shipEdits(WALEntryBatch entryBatch) {
entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
}
break;
+ } catch (IOException ioe) {
+ // Offset-Persist failure is treated as fatal to this shipper since it
might come from
+ // beforePersistingReplicationOffset. So abort and restart the
Shipper, and WAL reading
+ // will resume from the last successfully persisted offset
+ abortAndRestart(ioe);
Review Comment:
Better rethrow the exception to the top layer, and call abortAndRestart
there? After returning from this method, we may still have other logics in the
parent method and may cause inconsistency since the new shipper may already be
started...
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java:
##########
@@ -866,4 +866,24 @@ public long getTotalReplicatedEdits() {
long getSleepForRetries() {
return sleepForRetries;
}
+
+ void restartShipper(String walGroupId, ReplicationSourceShipper oldWorker) {
+ workerThreads.compute(walGroupId, (key, current) -> {
+ if (current != oldWorker) {
+ return current; // already replaced
+ }
+
+ LOG.warn("Restarting shipper for walGroupId={}", walGroupId);
+
+ try {
+ ReplicationSourceShipper newWorker = createNewShipper(walGroupId);
+ startShipper(newWorker);
+ return newWorker;
+ } catch (Exception e) {
+ LOG.error("Failed to restart shipper for walGroupId={}", walGroupId,
e);
+ return current; // retry later
Review Comment:
Checked che code, createNewShipper and startShipper both do not throw any
exception, so do we really need to catch exception here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]