This is an automated email from the ASF dual-hosted git repository. alberto pushed a commit to branch support/1.15 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 62e60b5062f4c164f3a8af0b52fbc1d5e39fb6aa Author: Alberto Gomez <alberto.go...@est.tech> AuthorDate: Mon Sep 12 15:12:14 2022 +0200 GEODE-10420: Finish distribute() work if interrupted (#7854) It is possible that an event of which a gateway sender is to be notified is lost if during the process the thread is interrupted. The reason is that the distribute() method in the AbstractGatewaySender when it catches the InterruptedException at some point, just returns, but does not put the event in the queue and neither drops it. The fix consists of handling the event correctly (put it in the queue or drop it) if the InterruptedException is caught but when the method returns set again the interrupt flag so that the caller is aware. --- .../geode/internal/cache/EntryEventImpl.java | 5 +- .../internal/cache/wan/AbstractGatewaySender.java | 20 ++- .../cache/wan/AbstractGatewaySenderTest.java | 170 +++++++++++++++++++++ 3 files changed, 186 insertions(+), 9 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java index 16adbeca7a..6a521becbe 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java @@ -340,8 +340,9 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent, op = other.op; distributedMember = other.distributedMember; filterInfo = other.filterInfo; - keyInfo = other.keyInfo.isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) other.keyInfo) - : new KeyInfo(other.keyInfo); + keyInfo = + other.getKeyInfo().isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) other.getKeyInfo()) + : new KeyInfo(other.getKeyInfo()); if (other.getRawCallbackArgument() instanceof GatewaySenderEventCallbackArgument) { keyInfo.setCallbackArg((new GatewaySenderEventCallbackArgument( (GatewaySenderEventCallbackArgument) other.getRawCallbackArgument()))); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 47fa99e4a0..73d85dd5ac 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -1039,6 +1039,7 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di List<Integer> allRemoteDSIds, boolean isLastEventInTransaction) { final boolean isDebugEnabled = logger.isDebugEnabled(); + boolean wasInterrupted = false; // released by this method or transfers ownership to TmpQueueEvent @Released @@ -1153,15 +1154,17 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di } } if (enqueuedAllTempQueueEvents) { - try { - while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) { - if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) { - return; + while (true) { + try { + while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) { + if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) { + return; + } } + break; + } catch (InterruptedException e) { + wasInterrupted = true; } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; } } } @@ -1210,6 +1213,9 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di if (freeClonedEvent) { clonedEvent.release(); // fix for bug 48035 } + if (wasInterrupted) { + Thread.currentThread().interrupt(); + } } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java index aac5f0d3c0..d57ba5f999 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java @@ -18,15 +18,28 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import org.junit.Test; +import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.Operation; import org.apache.geode.cache.Region; import org.apache.geode.cache.wan.GatewayQueueEvent; +import org.apache.geode.distributed.internal.DistributionAdvisor; +import org.apache.geode.internal.cache.EntryEventImpl; +import org.apache.geode.internal.cache.EnumListenerEvent; +import org.apache.geode.internal.cache.InternalRegion; +import org.apache.geode.internal.cache.KeyInfo; import org.apache.geode.internal.cache.RegionQueue; public class AbstractGatewaySenderTest { @@ -58,4 +71,161 @@ public class AbstractGatewaySenderTest { assertThat(event).isSameAs(gatewaySenderEvent); } + + @Test + public void distributeFinishesWorkWhenInterrupted() throws InterruptedException { + DummyGatewaySenderEventProcessor processor = new DummyGatewaySenderEventProcessor(); + TestableGatewaySender gatewaySender = new TestableGatewaySender(processor); + EnumListenerEvent operationType = EnumListenerEvent.AFTER_CREATE; + EntryEventImpl event = mock(EntryEventImpl.class); + when(event.getKeyInfo()).thenReturn(mock(KeyInfo.class)); + Operation operation = mock(Operation.class); + when(operation.isLocal()).thenReturn(false); + when(operation.isExpiration()).thenReturn(false); + when(event.getOperation()).thenReturn(operation); + InternalRegion region = mock(InternalRegion.class); + when(region.getDataPolicy()).thenReturn(DataPolicy.PARTITION); + when(event.getRegion()).thenReturn(region); + List<Integer> allRemoteDSIds = Collections.singletonList(1); + + CountDownLatch lockAcquiredLatch = new CountDownLatch(1); + CountDownLatch unlockLatch = new CountDownLatch(1); + + // Get lifeCycleLock in write mode in new thread so that + // the thread calling distribute will not be able + // to acquire it + Thread thread = new Thread(() -> { + gatewaySender.getLifeCycleLock().writeLock().lock(); + lockAcquiredLatch.countDown(); + try { + unlockLatch.await(); + } catch (InterruptedException ignore) { + } + gatewaySender.getLifeCycleLock().writeLock().unlock(); + }); + thread.start(); + lockAcquiredLatch.await(); + + // Send interrupted and then call distribute + Thread.currentThread().interrupt(); + gatewaySender.distribute(operationType, event, allRemoteDSIds, true); + + unlockLatch.countDown(); + + // Check that the interrupted exception has been reset + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + // Check that the work was finished even if the interrupt signal was set + assertThat(processor.getTimesRegisterEventDroppedInPrimaryQueueCalled()).isEqualTo(1); + } + + public static class TestableGatewaySender extends AbstractGatewaySender { + private int isRunningTimesCalled = 0; + + public TestableGatewaySender(AbstractGatewaySenderEventProcessor eventProcessor) { + this.eventProcessor = eventProcessor; + enqueuedAllTempQueueEvents = true; + } + + @Override + public void fillInProfile(DistributionAdvisor.Profile profile) {} + + @Override + public void start() {} + + @Override + public boolean isPrimary() { + return true; + } + + @Override + public void startWithCleanQueue() {} + + @Override + public void stop() {} + + @Override + public void setModifiedEventId(EntryEventImpl clonedEvent) {} + + @Override + public GatewaySenderStats getStatistics() { + return mock(GatewaySenderStats.class); + } + + @Override + public GatewaySenderAdvisor getSenderAdvisor() { + return mock(GatewaySenderAdvisor.class); + } + + @Override + public boolean isRunning() { + if (isRunningTimesCalled++ == 0) { + return true; + } + return false; + } + + @Override + public String getId() { + return "test"; + } + } + + public static class DummyGatewaySenderEventProcessor extends AbstractGatewaySenderEventProcessor { + + private int timesEnqueueEventCalled = 0; + private int timesRegisterEventDroppedInPrimaryQueueCalled = 0; + + public DummyGatewaySenderEventProcessor() { + super("", new DummyGatewaySender(), null); + } + + @Override + public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, Object substituteValue, + boolean isLastEventInTransaction) throws IOException, CacheException { + timesEnqueueEventCalled++; + } + + public int getTimesEnqueueEventCalled() { + return timesEnqueueEventCalled; + } + + @Override + protected void initializeMessageQueue(String id, boolean cleanQueues) {} + + @Override + protected void rebalance() {} + + public int getTimesRegisterEventDroppedInPrimaryQueueCalled() { + return timesRegisterEventDroppedInPrimaryQueueCalled; + } + + @Override + protected void registerEventDroppedInPrimaryQueue(EntryEventImpl droppedEvent) { + timesRegisterEventDroppedInPrimaryQueueCalled++; + } + + @Override + public void initializeEventDispatcher() {} + + @Override + protected void enqueueEvent(GatewayQueueEvent event) {} + } + + public static class DummyGatewaySender extends AbstractGatewaySender { + @Override + public void fillInProfile(DistributionAdvisor.Profile profile) {} + + @Override + public void start() {} + + @Override + public void startWithCleanQueue() {} + + @Override + public void stop() {} + + @Override + public void setModifiedEventId(EntryEventImpl clonedEvent) {} + + } }