ankitsol commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r3462558979
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -90,38 +103,83 @@ public ReplicationSourceShipper(Configuration conf, String
walGroupId, Replicati
this.conf.getInt("replication.source.getEntries.timeout",
DEFAULT_TIMEOUT);
this.shipEditsTimeout =
this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
+ this.offsetUpdateIntervalMs =
+ conf.getLong(OFFSET_UPDATE_INTERVAL_MS_KEY,
DEFAULT_OFFSET_UPDATE_INTERVAL_MS);
+ this.offsetUpdateSizeThresholdBytes =
+ conf.getLong(OFFSET_UPDATE_SIZE_THRESHOLD_KEY,
DEFAULT_OFFSET_UPDATE_SIZE_THRESHOLD);
}
@Override
public final void run() {
setWorkerState(WorkerState.RUNNING);
LOG.info("Running ReplicationSourceShipper Thread for wal group: {}",
this.walGroupId);
- // Loop until we close down
- while (isActive()) {
- // Sleep until replication is enabled again
- if (!source.isPeerEnabled()) {
- // The peer enabled check is in memory, not expensive, so do not need
to increase the
- // sleep interval as it may cause a long lag when we enable the peer.
- sleepForRetries("Replication is disabled", sleepForRetries, 1,
maxRetriesMultiplier);
- continue;
- }
- try {
- WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
- LOG.debug("Shipper from source {} got entry batch from reader: {}",
source.getQueueId(),
- entryBatch);
- if (entryBatch == null) {
+ try {
+ // Loop until we close down
+ while (isActive()) {
+ // Sleep until replication is enabled again
+ if (!source.isPeerEnabled()) {
+ // The peer enabled check is in memory, not expensive, so do not
need to increase the
+ // sleep interval as it may cause a long lag when we enable the peer.
+ sleepForRetries("Replication is disabled", sleepForRetries, 1,
maxRetriesMultiplier);
continue;
}
- // the NO_MORE_DATA instance has no path so do not call shipEdits
- if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
- noMoreData();
- } else {
- shipEdits(entryBatch);
+ try {
+ // check time-based offset persistence
+ if (shouldPersistLogPosition()) {
+ persistLogPosition();
+ }
+
+ long pollTimeout = getEntriesTimeout;
+ if (offsetUpdateIntervalMs != Long.MAX_VALUE) {
+ long elapsed = EnvironmentEdgeManager.currentTime() -
lastOffsetUpdateTime;
+ long remaining = offsetUpdateIntervalMs - elapsed;
+ if (remaining > 0) {
+ pollTimeout = Math.min(getEntriesTimeout, remaining);
+ }
+ }
+ WALEntryBatch entryBatch = entryReader.poll(pollTimeout);
+ LOG.debug("Shipper from source {} got entry batch from reader: {}",
source.getQueueId(),
+ entryBatch);
+
+ if (entryBatch == null) {
+ continue;
+ }
+ // the NO_MORE_DATA instance has no path so do not call shipEdits
+ if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
+ noMoreData();
+ } else {
+ shipEdits(entryBatch);
+ }
+ } catch (InterruptedException | ReplicationRuntimeException e) {
+ LOG.warn("Interrupted while waiting for next replication entry
batch", e);
+ Thread.currentThread().interrupt();
+ } catch (IOException ioe) {
+ // During source shutdown / peer removal we can see interrupted IOEs
+ // from replication queue updates. Do not restart in this case.
+ if (!source.isSourceActive() || isInterrupted() ||
!source.isPeerEnabled()) {
+ LOG.info("Ignoring persist failure during shutdown for
walGroupId={}", walGroupId, ioe);
+ break;
+ }
+ LOG.error("Shipper {} failed to persist offset, restarting",
walGroupId, ioe);
+ abortAndRestart(ioe);
+ break;
+ }
+ }
+ } finally {
Review Comment:
Good point! Yes, finally block was for any unchecked exception, I liked the
idea of merging it as you pointed out.
So, I did following changes in this new commit:
- Moved the final persistLogPosition() call into the normal worker exit path
before transitioning to STOPPED (as you suggested)
- Removed the abortingForRestart flag, as it was only needed to coordinate
with the finally block.
- Changed restart handling to abortAndRestart(...); return; (instead of
'break') so the old shipper exits immediately after scheduling a replacement
and does not execute any further shutdown logic.
- Expanded the catch block to handle all exceptions (Exception instead of
just IOException) from offset persistence (including unchecked exceptions from
beforePersistingReplicationOffset()).
Please let me know if it looks good @Apache9
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]