This is an automated email from the ASF dual-hosted git repository. alexpl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new a47ff44 IGNITE-15099 Fix concurrent heartbeat update while in blocking section for system workers - Fixes #9259. a47ff44 is described below commit a47ff44f40a1dbdeaa03289966ce8055e5b0127f Author: Aleksey Plekhanov <plehanov.a...@gmail.com> AuthorDate: Fri Jul 16 09:32:54 2021 +0300 IGNITE-15099 Fix concurrent heartbeat update while in blocking section for system workers - Fixes #9259. Signed-off-by: Aleksey Plekhanov <plehanov.a...@gmail.com> --- .../ignite/internal/util/worker/GridWorker.java | 18 +++++++- .../ignite/failure/SystemWorkersBlockingTest.java | 52 ++++++++++++++++++++++ 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java index 5926b9c..615d506 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/GridWorker.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.worker; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInterruptedCheckedException; @@ -56,6 +57,10 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher { /** Timestamp to be updated by this worker periodically to indicate it's up and running. */ private volatile long heartbeatTs; + /** Atomic field updater to change heartbeat. */ + private static final AtomicLongFieldUpdater<GridWorker> HEARTBEAT_UPDATER = + AtomicLongFieldUpdater.newUpdater(GridWorker.class, "heartbeatTs"); + /** Mutex for finish awaiting. */ private final Object mux = new Object(); @@ -273,7 +278,16 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher { /** {@inheritDoc} */ @Override public void updateHeartbeat() { - heartbeatTs = U.currentTimeMillis(); + long curTs = U.currentTimeMillis(); + long hbTs = heartbeatTs; + + // Avoid heartbeat update while in the blocking section. + while (hbTs < curTs) { + if (HEARTBEAT_UPDATER.compareAndSet(this, hbTs, curTs)) + return; + + hbTs = heartbeatTs; + } } /** {@inheritDoc} */ @@ -283,7 +297,7 @@ public abstract class GridWorker implements Runnable, WorkProgressDispatcher { /** {@inheritDoc} */ @Override public void blockingSectionEnd() { - updateHeartbeat(); + heartbeatTs = U.currentTimeMillis(); } /** Can be called from {@link #runner()} thread to perform idleness handling. */ diff --git a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java index ccfc507..57495da 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.Ignite; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.testframework.GridTestUtils; @@ -127,6 +128,57 @@ public class SystemWorkersBlockingTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + @Test + public void testBlockingSection() throws Exception { + IgniteEx ignite = startGrid(0); + + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch blockingSectionLatch = new CountDownLatch(1); + CountDownLatch endLatch = new CountDownLatch(1); + + GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) { + @Override protected void body() { + blockingSectionBegin(); + + try { + startLatch.countDown(); + + blockingSectionLatch.await(); + } + catch (Exception ignore) { + // No-op. + } + finally { + blockingSectionEnd(); + + endLatch.countDown(); + } + } + }; + + runWorker(worker); + + ignite.context().workersRegistry().register(worker); + + startLatch.await(); + + // Check that concurrent heartbeat update doesn't affect the blocking section. + worker.updateHeartbeat(); + + Thread.sleep(2 * SYSTEM_WORKER_BLOCKED_TIMEOUT); + + blockingSectionLatch.countDown(); + + endLatch.await(); + + assertNull(failureError.get()); + + assertTrue(worker.heartbeatTs() <= U.currentTimeMillis()); + } + + /** * Tests that repeatedly calling {@link WorkersRegistry#onIdle} in single registered {@link GridWorker} * doesn't lead to infinite loop. *