This is an automated email from the ASF dual-hosted git repository.

alberto pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 62e60b5062f4c164f3a8af0b52fbc1d5e39fb6aa
Author: Alberto Gomez <alberto.go...@est.tech>
AuthorDate: Mon Sep 12 15:12:14 2022 +0200

    GEODE-10420: Finish distribute() work if interrupted (#7854)
    
    It is possible that an event of which a gateway sender
    is to be notified is lost if during the process the thread
    is interrupted.
    
    The reason is that the distribute() method
    in the AbstractGatewaySender when it catches the
    InterruptedException at some point, just returns, but
    does not put the event in the queue and neither
    drops it.
    
    The fix consists of handling the event correctly
    (put it in the queue or drop it) if the InterruptedException
    is caught but when the method returns set again
    the interrupt flag so that the caller is aware.
---
 .../geode/internal/cache/EntryEventImpl.java       |   5 +-
 .../internal/cache/wan/AbstractGatewaySender.java  |  20 ++-
 .../cache/wan/AbstractGatewaySenderTest.java       | 170 +++++++++++++++++++++
 3 files changed, 186 insertions(+), 9 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 16adbeca7a..6a521becbe 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -340,8 +340,9 @@ public class EntryEventImpl implements InternalEntryEvent, 
InternalCacheEvent,
     op = other.op;
     distributedMember = other.distributedMember;
     filterInfo = other.filterInfo;
-    keyInfo = other.keyInfo.isDistKeyInfo() ? new 
DistTxKeyInfo((DistTxKeyInfo) other.keyInfo)
-        : new KeyInfo(other.keyInfo);
+    keyInfo =
+        other.getKeyInfo().isDistKeyInfo() ? new DistTxKeyInfo((DistTxKeyInfo) 
other.getKeyInfo())
+            : new KeyInfo(other.getKeyInfo());
     if (other.getRawCallbackArgument() instanceof 
GatewaySenderEventCallbackArgument) {
       keyInfo.setCallbackArg((new GatewaySenderEventCallbackArgument(
           (GatewaySenderEventCallbackArgument) 
other.getRawCallbackArgument())));
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 47fa99e4a0..73d85dd5ac 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1039,6 +1039,7 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
       List<Integer> allRemoteDSIds, boolean isLastEventInTransaction) {
 
     final boolean isDebugEnabled = logger.isDebugEnabled();
+    boolean wasInterrupted = false;
 
     // released by this method or transfers ownership to TmpQueueEvent
     @Released
@@ -1153,15 +1154,17 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
           }
         }
         if (enqueuedAllTempQueueEvents) {
-          try {
-            while (!getLifeCycleLock().readLock().tryLock(10, 
TimeUnit.MILLISECONDS)) {
-              if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, 
clonedEvent)) {
-                return;
+          while (true) {
+            try {
+              while (!getLifeCycleLock().readLock().tryLock(10, 
TimeUnit.MILLISECONDS)) {
+                if (!getIsRunningAndDropEventIfNotRunning(event, 
isDebugEnabled, clonedEvent)) {
+                  return;
+                }
               }
+              break;
+            } catch (InterruptedException e) {
+              wasInterrupted = true;
             }
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            return;
           }
         }
       }
@@ -1210,6 +1213,9 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
       if (freeClonedEvent) {
         clonedEvent.release(); // fix for bug 48035
       }
+      if (wasInterrupted) {
+        Thread.currentThread().interrupt();
+      }
     }
   }
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java
index aac5f0d3c0..d57ba5f999 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderTest.java
@@ -18,15 +18,28 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 
 import org.junit.Test;
 
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.KeyInfo;
 import org.apache.geode.internal.cache.RegionQueue;
 
 public class AbstractGatewaySenderTest {
@@ -58,4 +71,161 @@ public class AbstractGatewaySenderTest {
 
     assertThat(event).isSameAs(gatewaySenderEvent);
   }
+
+  @Test
+  public void distributeFinishesWorkWhenInterrupted() throws 
InterruptedException {
+    DummyGatewaySenderEventProcessor processor = new 
DummyGatewaySenderEventProcessor();
+    TestableGatewaySender gatewaySender = new TestableGatewaySender(processor);
+    EnumListenerEvent operationType = EnumListenerEvent.AFTER_CREATE;
+    EntryEventImpl event = mock(EntryEventImpl.class);
+    when(event.getKeyInfo()).thenReturn(mock(KeyInfo.class));
+    Operation operation = mock(Operation.class);
+    when(operation.isLocal()).thenReturn(false);
+    when(operation.isExpiration()).thenReturn(false);
+    when(event.getOperation()).thenReturn(operation);
+    InternalRegion region = mock(InternalRegion.class);
+    when(region.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
+    when(event.getRegion()).thenReturn(region);
+    List<Integer> allRemoteDSIds = Collections.singletonList(1);
+
+    CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
+    CountDownLatch unlockLatch = new CountDownLatch(1);
+
+    // Get lifeCycleLock in write mode in new thread so that
+    // the thread calling distribute will not be able
+    // to acquire it
+    Thread thread = new Thread(() -> {
+      gatewaySender.getLifeCycleLock().writeLock().lock();
+      lockAcquiredLatch.countDown();
+      try {
+        unlockLatch.await();
+      } catch (InterruptedException ignore) {
+      }
+      gatewaySender.getLifeCycleLock().writeLock().unlock();
+    });
+    thread.start();
+    lockAcquiredLatch.await();
+
+    // Send interrupted and then call distribute
+    Thread.currentThread().interrupt();
+    gatewaySender.distribute(operationType, event, allRemoteDSIds, true);
+
+    unlockLatch.countDown();
+
+    // Check that the interrupted exception has been reset
+    assertThat(Thread.currentThread().isInterrupted()).isTrue();
+    // Check that the work was finished even if the interrupt signal was set
+    
assertThat(processor.getTimesRegisterEventDroppedInPrimaryQueueCalled()).isEqualTo(1);
+  }
+
+  public static class TestableGatewaySender extends AbstractGatewaySender {
+    private int isRunningTimesCalled = 0;
+
+    public TestableGatewaySender(AbstractGatewaySenderEventProcessor 
eventProcessor) {
+      this.eventProcessor = eventProcessor;
+      enqueuedAllTempQueueEvents = true;
+    }
+
+    @Override
+    public void fillInProfile(DistributionAdvisor.Profile profile) {}
+
+    @Override
+    public void start() {}
+
+    @Override
+    public boolean isPrimary() {
+      return true;
+    }
+
+    @Override
+    public void startWithCleanQueue() {}
+
+    @Override
+    public void stop() {}
+
+    @Override
+    public void setModifiedEventId(EntryEventImpl clonedEvent) {}
+
+    @Override
+    public GatewaySenderStats getStatistics() {
+      return mock(GatewaySenderStats.class);
+    }
+
+    @Override
+    public GatewaySenderAdvisor getSenderAdvisor() {
+      return mock(GatewaySenderAdvisor.class);
+    }
+
+    @Override
+    public boolean isRunning() {
+      if (isRunningTimesCalled++ == 0) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public String getId() {
+      return "test";
+    }
+  }
+
+  public static class DummyGatewaySenderEventProcessor extends 
AbstractGatewaySenderEventProcessor {
+
+    private int timesEnqueueEventCalled = 0;
+    private int timesRegisterEventDroppedInPrimaryQueueCalled = 0;
+
+    public DummyGatewaySenderEventProcessor() {
+      super("", new DummyGatewaySender(), null);
+    }
+
+    @Override
+    public void enqueueEvent(EnumListenerEvent operation, EntryEvent event, 
Object substituteValue,
+        boolean isLastEventInTransaction) throws IOException, CacheException {
+      timesEnqueueEventCalled++;
+    }
+
+    public int getTimesEnqueueEventCalled() {
+      return timesEnqueueEventCalled;
+    }
+
+    @Override
+    protected void initializeMessageQueue(String id, boolean cleanQueues) {}
+
+    @Override
+    protected void rebalance() {}
+
+    public int getTimesRegisterEventDroppedInPrimaryQueueCalled() {
+      return timesRegisterEventDroppedInPrimaryQueueCalled;
+    }
+
+    @Override
+    protected void registerEventDroppedInPrimaryQueue(EntryEventImpl 
droppedEvent) {
+      timesRegisterEventDroppedInPrimaryQueueCalled++;
+    }
+
+    @Override
+    public void initializeEventDispatcher() {}
+
+    @Override
+    protected void enqueueEvent(GatewayQueueEvent event) {}
+  }
+
+  public static class DummyGatewaySender extends AbstractGatewaySender {
+    @Override
+    public void fillInProfile(DistributionAdvisor.Profile profile) {}
+
+    @Override
+    public void start() {}
+
+    @Override
+    public void startWithCleanQueue() {}
+
+    @Override
+    public void stop() {}
+
+    @Override
+    public void setModifiedEventId(EntryEventImpl clonedEvent) {}
+
+  }
 }

Reply via email to