This is an automated email from the ASF dual-hosted git repository. burcham pushed a commit to branch support/1.13 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push: new 2ff9f46 GEODE-9180: warn when heartbeat thread oversleeps (#6360) 2ff9f46 is described below commit 2ff9f4696028d05dbca669f3d873caf9e75922be Author: Bill Burcham <bill.burc...@gmail.com> AuthorDate: Wed Apr 28 10:22:16 2021 -0700 GEODE-9180: warn when heartbeat thread oversleeps (#6360) * heartbeat producer logs warning when it oversleeps by a period or more (cherry picked from commit f8b07a007ac93c323cd888cbc53dc3914336077f) --- .../gms/fd/GMSHealthMonitorJUnitTest.java | 47 +++++ .../membership/gms/fd/GMSHealthMonitor.java | 190 ++++++++++++--------- 2 files changed, 161 insertions(+), 76 deletions(-) diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java index 2aaf2f5..81e132a 100644 --- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java +++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java @@ -48,6 +48,7 @@ import java.util.Timer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.LongUnaryOperator; import org.junit.After; import org.junit.Assert; @@ -952,6 +953,52 @@ public class GMSHealthMonitorJUnitTest { executeTestDoTCPCheck(GMSHealthMonitor.ERROR + 100, false); } + @Test + public void heartbeatOversleepCausesWarning() { + testHeartbeatSleepScenario(sleepLimit -> sleepLimit + 1, + "Failure detection heartbeat-generation thread overslept by more than a full period. Asleep time: 1,000,000,001 nanoseconds. Period: 500,000,000 nanoseconds."); + } + + @Test + public void heartbeatOnTimeWakeupCausesNoWarning() { + testHeartbeatSleepScenario(sleepLimit -> sleepLimit, + null); + } + + private void testHeartbeatSleepScenario(final LongUnaryOperator actualSleepPeriod, + final String expectedLogWarning) { + + /* + * Creating a class here because it's a convenient to provide (mutable) variables needed + * by the lambdas. Without the class, each of them would have to be arrays or atomics + * or some other kind of "holder object". By creating a class they can simply be fields. + */ + new Runnable() { + // the thing we're testing + final GMSHealthMonitor.Heart heart = gmsHealthMonitor.new Heart(); + int periodNumber = 0; // index into times + String capturedMessage; // warning message (if any) generated by heart + + @Override + public void run() { + heart.sendPeriodicHeartbeats(sleepMillis -> { + }, + () -> { + switch (periodNumber++) { + case 0: + return 0L; + case 1: + default: + gmsHealthMonitor.stop(); + return actualSleepPeriod.applyAsLong(heart.sleepLimitNanos); + } + }, + msg -> capturedMessage = msg); + assertThat(capturedMessage).isEqualTo(expectedLogWarning); + } + }.run(); + } + private void executeTestDoTCPCheck(int receivedStatus, boolean expectedResult) throws Exception { MemberIdentifier otherMember = createGMSMember(Version.CURRENT_ORDINAL, 0, 1, 1); diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java index 2590e23..7457557 100644 --- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java +++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java @@ -750,82 +750,7 @@ public class GMSHealthMonitor<ID extends MemberIdentifier> implements HealthMoni * process */ private void startHeartbeatThread() { - checkExecutor.execute(new Runnable() { - @Override - public void run() { - Thread.currentThread().setName("Geode Heartbeat Sender"); - sendPeriodicHeartbeats(); - } - - private void sendPeriodicHeartbeats() { - while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) { - try { - Thread.sleep(memberTimeout / LOGICAL_INTERVAL); - } catch (InterruptedException e) { - return; - } - GMSMembershipView<ID> v = currentView; - if (v != null) { - List<ID> mbrs = v.getMembers(); - int index = mbrs.indexOf(localAddress); - if (index < 0 || mbrs.size() < 2) { - continue; - } - if (!playingDead) { - sendHeartbeats(mbrs, index); - } - } - } - } - - private void sendHeartbeats(List<ID> mbrs, int startIndex) { - ID coordinator = currentView.getCoordinator(); - if (coordinator != null && !coordinator.equals(localAddress)) { - HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1); - message.setRecipient(coordinator); - try { - if (isStopping) { - return; - } - services.getMessenger().sendUnreliably(message); - GMSHealthMonitor.this.stats.incHeartbeatsSent(); - } catch (MembershipClosedException e) { - return; - } - } - - int index = startIndex; - int numSent = 0; - for (;;) { - index--; - if (index < 0) { - index = mbrs.size() - 1; - } - ID mbr = mbrs.get(index); - if (mbr.equals(localAddress)) { - break; - } - if (mbr.equals(coordinator)) { - continue; - } - if (isStopping) { - return; - } - HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1); - message.setRecipient(mbr); - try { - services.getMessenger().sendUnreliably(message); - GMSHealthMonitor.this.stats.incHeartbeatsSent(); - numSent++; - if (numSent >= NUM_HEARTBEATS) { - break; - } - } catch (MembershipClosedException e) { - return; - } - } - } // for (;;) - }); + checkExecutor.execute(new Heart()); } @Override @@ -1530,4 +1455,117 @@ public class GMSHealthMonitor<ID extends MemberIdentifier> implements HealthMoni public MembershipStatistics getStats() { return this.stats; } + + @FunctionalInterface + interface Sleeper { + void sleep(long millis) throws InterruptedException; + } + + @FunctionalInterface + interface NanoTimer { + long nanoTime(); + } + + @FunctionalInterface + interface Warner { + void warn(String message); + } + + class Heart implements Runnable { + + // If we sleep longer than this number of periods then log a warning + public static final int OVERSLEEP_WARNING_THRESHOLD_PERIODS = 2; + public final long sleepPeriodMillis = memberTimeout / LOGICAL_INTERVAL; + public final long sleepPeriodNanos = + TimeUnit.NANOSECONDS.convert(sleepPeriodMillis, TimeUnit.MILLISECONDS); + public final long sleepLimitNanos = OVERSLEEP_WARNING_THRESHOLD_PERIODS * sleepPeriodNanos; + + @Override + public void run() { + Thread.currentThread().setName("Geode Heartbeat Sender"); + sendPeriodicHeartbeats(Thread::sleep, System::nanoTime, logger::warn); + } + + @VisibleForTesting + void sendPeriodicHeartbeats(final Sleeper sleeper, + final NanoTimer nanoTimer, + final Warner warner) { + while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) { + try { + final long timeBeforeSleep = nanoTimer.nanoTime(); + sleeper.sleep(sleepPeriodMillis); + final long timeAfterSleep = nanoTimer.nanoTime(); + final long asleepNanos = timeAfterSleep - timeBeforeSleep; + if (asleepNanos > sleepLimitNanos) { + warner.warn( + String.format( + "Failure detection heartbeat-generation thread overslept by more than a full period. Asleep time: %,d nanoseconds. Period: %,d nanoseconds.", + asleepNanos, sleepPeriodNanos)); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + GMSMembershipView<ID> v = currentView; + if (v != null) { + List<ID> mbrs = v.getMembers(); + int index = mbrs.indexOf(localAddress); + if (index < 0 || mbrs.size() < 2) { + continue; + } + if (!playingDead) { + sendHeartbeats(mbrs, index); + } + } + } + } + + private void sendHeartbeats(List<ID> mbrs, int startIndex) { + ID coordinator = currentView.getCoordinator(); + if (coordinator != null && !coordinator.equals(localAddress)) { + HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1); + message.setRecipient(coordinator); + try { + if (isStopping) { + return; + } + services.getMessenger().sendUnreliably(message); + GMSHealthMonitor.this.stats.incHeartbeatsSent(); + } catch (MembershipClosedException e) { + return; + } + } + + int index = startIndex; + int numSent = 0; + for (;;) { + index--; + if (index < 0) { + index = mbrs.size() - 1; + } + ID mbr = mbrs.get(index); + if (mbr.equals(localAddress)) { + break; + } + if (mbr.equals(coordinator)) { + continue; + } + if (isStopping) { + return; + } + HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1); + message.setRecipient(mbr); + try { + services.getMessenger().sendUnreliably(message); + GMSHealthMonitor.this.stats.incHeartbeatsSent(); + numSent++; + if (numSent >= NUM_HEARTBEATS) { + break; + } + } catch (MembershipClosedException e) { + return; + } + } + } // for (;;) + } }