This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch release-2.49.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.49.0 by this push:
     new 6ce85e5f8ae Ensure CancellableQueue doesn't keep references to 
logically removed objects. (#27403) (#27419)
6ce85e5f8ae is described below

commit 6ce85e5f8ae908024c2b22afd06901551ce72a49
Author: Yi Hu <ya...@google.com>
AuthorDate: Mon Jul 10 10:26:49 2023 -0400

    Ensure CancellableQueue doesn't keep references to logically removed 
objects. (#27403) (#27419)
    
    Co-authored-by: scwhittle <scwhit...@users.noreply.github.com>
---
 .../org/apache/beam/sdk/fn/CancellableQueue.java   | 26 +++++++++++++-------
 .../apache/beam/sdk/fn/CancellableQueueTest.java   | 28 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 8 deletions(-)

diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
index 96b0f8b9a0d..77fd73f5655 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
@@ -22,9 +22,9 @@ import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * A simplified {@link ThreadSafe} blocking queue that can be cancelled 
freeing any blocked {@link
@@ -36,7 +36,7 @@ import org.checkerframework.checker.nullness.qual.NonNull;
 public class CancellableQueue<T extends @NonNull Object> {
 
   private final int capacity;
-  private final Object[] elements;
+  private final @Nullable Object[] elements;
   private final Lock lock;
   private final Condition notFull;
   private final Condition notEmpty;
@@ -86,8 +86,9 @@ public class CancellableQueue<T extends @NonNull Object> {
    *     must invoke {@link #cancel} if the interrupt is unrecoverable.
    * @throws Exception if the queue is cancelled.
    */
+  @SuppressWarnings({"cast"})
   public T take() throws Exception, InterruptedException {
-    Object rval;
+    T rval;
     try {
       lock.lockInterruptibly();
       while (count == 0 && cancellationException == null) {
@@ -97,14 +98,15 @@ public class CancellableQueue<T extends @NonNull Object> {
         throw cancellationException;
       }
 
-      rval = elements[takeIndex];
+      rval = (T) elements[takeIndex];
+      elements[takeIndex] = null;
       takeIndex = (takeIndex + 1) % elements.length;
       count -= 1;
       notFull.signal();
     } finally {
       lock.unlock();
     }
-    return (T) rval;
+    return rval;
   }
 
   /**
@@ -119,6 +121,7 @@ public class CancellableQueue<T extends @NonNull Object> {
     try {
       if (cancellationException == null) {
         cancellationException = exception;
+        clearElementsLocked();
       }
       notEmpty.signalAll();
       notFull.signalAll();
@@ -127,14 +130,21 @@ public class CancellableQueue<T extends @NonNull Object> {
     }
   }
 
+  private void clearElementsLocked() {
+    for (int i = takeIndex; count > 0; i = (i + 1) % elements.length) {
+      elements[i] = null;
+      --count;
+    }
+    addIndex = 0;
+    takeIndex = 0;
+  }
+
   /** Enables the queue to be re-used after it has been cancelled. */
   public void reset() {
     lock.lock();
     try {
       cancellationException = null;
-      addIndex = 0;
-      takeIndex = 0;
-      count = 0;
+      clearElementsLocked();
     } finally {
       lock.unlock();
     }
diff --git 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/CancellableQueueTest.java
 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/CancellableQueueTest.java
index b8b577b1805..0e3ae17e50f 100644
--- 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/CancellableQueueTest.java
+++ 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/CancellableQueueTest.java
@@ -21,7 +21,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
+import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -219,4 +221,30 @@ public class CancellableQueueTest {
     queue.cancel(new RuntimeException("Second cancel exception"));
     assertThrows("First cancel exception", RuntimeException.class, () -> 
queue.take());
   }
+
+  @Test
+  public void testMemoryReferenceOnTake() throws Exception {
+    String s1 = new String("test1");
+    String s2 = new String("test2");
+    WeakReference<String> weakReference1 = new WeakReference<>(s1);
+    WeakReference<String> weakReference2 = new WeakReference<>(s2);
+    CancellableQueue<String> queue = new CancellableQueue<>(100);
+    queue.put(s1);
+    queue.put(s2);
+    s1 = null;
+    s2 = null;
+    System.gc();
+    assertTrue(weakReference1.get() != null);
+    assertTrue(weakReference2.get() != null);
+
+    assertEquals("test1", queue.take());
+    System.gc();
+    assertTrue(weakReference1.get() == null);
+    assertTrue(weakReference2.get() != null);
+
+    queue.reset();
+    System.gc();
+    assertTrue(weakReference1.get() == null);
+    assertTrue(weakReference2.get() == null);
+  }
 }

Reply via email to