This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch release-2.49.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.49.0 by this push: new 6ce85e5f8ae Ensure CancellableQueue doesn't keep references to logically removed objects. (#27403) (#27419) 6ce85e5f8ae is described below commit 6ce85e5f8ae908024c2b22afd06901551ce72a49 Author: Yi Hu <ya...@google.com> AuthorDate: Mon Jul 10 10:26:49 2023 -0400 Ensure CancellableQueue doesn't keep references to logically removed objects. (#27403) (#27419) Co-authored-by: scwhittle <scwhit...@users.noreply.github.com> --- .../org/apache/beam/sdk/fn/CancellableQueue.java | 26 +++++++++++++------- .../apache/beam/sdk/fn/CancellableQueueTest.java | 28 ++++++++++++++++++++++ 2 files changed, 46 insertions(+), 8 deletions(-) diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java index 96b0f8b9a0d..77fd73f5655 100644 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java @@ -22,9 +22,9 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; /** * A simplified {@link ThreadSafe} blocking queue that can be cancelled freeing any blocked {@link @@ -36,7 +36,7 @@ import org.checkerframework.checker.nullness.qual.NonNull; public class CancellableQueue<T extends @NonNull Object> { private final int capacity; - private final Object[] elements; + private final @Nullable Object[] elements; private final Lock lock; private final Condition notFull; private final Condition notEmpty; @@ -86,8 +86,9 @@ public class CancellableQueue<T extends @NonNull Object> { * must invoke {@link #cancel} if the interrupt is unrecoverable. * @throws Exception if the queue is cancelled. */ + @SuppressWarnings({"cast"}) public T take() throws Exception, InterruptedException { - Object rval; + T rval; try { lock.lockInterruptibly(); while (count == 0 && cancellationException == null) { @@ -97,14 +98,15 @@ public class CancellableQueue<T extends @NonNull Object> { throw cancellationException; } - rval = elements[takeIndex]; + rval = (T) elements[takeIndex]; + elements[takeIndex] = null; takeIndex = (takeIndex + 1) % elements.length; count -= 1; notFull.signal(); } finally { lock.unlock(); } - return (T) rval; + return rval; } /** @@ -119,6 +121,7 @@ public class CancellableQueue<T extends @NonNull Object> { try { if (cancellationException == null) { cancellationException = exception; + clearElementsLocked(); } notEmpty.signalAll(); notFull.signalAll(); @@ -127,14 +130,21 @@ public class CancellableQueue<T extends @NonNull Object> { } } + private void clearElementsLocked() { + for (int i = takeIndex; count > 0; i = (i + 1) % elements.length) { + elements[i] = null; + --count; + } + addIndex = 0; + takeIndex = 0; + } + /** Enables the queue to be re-used after it has been cancelled. */ public void reset() { lock.lock(); try { cancellationException = null; - addIndex = 0; - takeIndex = 0; - count = 0; + clearElementsLocked(); } finally { lock.unlock(); } diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/CancellableQueueTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/CancellableQueueTest.java index b8b577b1805..0e3ae17e50f 100644 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/CancellableQueueTest.java +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/CancellableQueueTest.java @@ -21,7 +21,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -219,4 +221,30 @@ public class CancellableQueueTest { queue.cancel(new RuntimeException("Second cancel exception")); assertThrows("First cancel exception", RuntimeException.class, () -> queue.take()); } + + @Test + public void testMemoryReferenceOnTake() throws Exception { + String s1 = new String("test1"); + String s2 = new String("test2"); + WeakReference<String> weakReference1 = new WeakReference<>(s1); + WeakReference<String> weakReference2 = new WeakReference<>(s2); + CancellableQueue<String> queue = new CancellableQueue<>(100); + queue.put(s1); + queue.put(s2); + s1 = null; + s2 = null; + System.gc(); + assertTrue(weakReference1.get() != null); + assertTrue(weakReference2.get() != null); + + assertEquals("test1", queue.take()); + System.gc(); + assertTrue(weakReference1.get() == null); + assertTrue(weakReference2.get() != null); + + queue.reset(); + System.gc(); + assertTrue(weakReference1.get() == null); + assertTrue(weakReference2.get() == null); + } }