This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aedfa461fb3 [#20970] Fix gRPC leak by closing ResidualSource at 
BoundedToUnboundedSourceAdapter.Reader#init() in Dataflow worker (#28548)
aedfa461fb3 is described below

commit aedfa461fb354e0e97d75694d2a117d9ad505602
Author: Minbo Bae <49642083+baemi...@users.noreply.github.com>
AuthorDate: Wed Oct 11 10:57:24 2023 -0700

    [#20970] Fix gRPC leak by closing ResidualSource at 
BoundedToUnboundedSourceAdapter.Reader#init() in Dataflow worker (#28548)
---
 .../UnboundedReadFromBoundedSource.java            |  14 +-
 .../UnboundedReadFromBoundedSourceTest.java        | 180 +++++++++++++++++++++
 .../dataflow/worker/WorkerCustomSources.java       |   7 +-
 3 files changed, 198 insertions(+), 3 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 67697636a36..53fad782da9 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.core.construction;
 
-import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -288,6 +288,15 @@ public class UnboundedReadFromBoundedSource<T> extends 
PTransform<PBegin, PColle
             residualElementsList == null
                 ? new ResidualElements(Collections.emptyList())
                 : new ResidualElements(residualElementsList);
+
+        if (this.residualSource != null) {
+          // close current residualSource to avoid leak of reader.close() in 
ResidualSource
+          try {
+            this.residualSource.close();
+          } catch (IOException e) {
+            LOG.warn("Ignore error at closing ResidualSource", e);
+          }
+        }
         this.residualSource =
             residualSource == null ? null : new ResidualSource(residualSource, 
options);
       }
@@ -465,7 +474,7 @@ public class UnboundedReadFromBoundedSource<T> extends 
PTransform<PBegin, PColle
       }
 
       private boolean advance() throws IOException {
-        checkArgument(!closed, "advance() call on closed %s", 
getClass().getName());
+        checkState(!closed, "advance() call on closed %s", 
getClass().getName());
         if (readerDone) {
           return false;
         }
@@ -505,6 +514,7 @@ public class UnboundedReadFromBoundedSource<T> extends 
PTransform<PBegin, PColle
       }
 
       Checkpoint<T> getCheckpointMark() {
+        checkState(!closed, "getCheckpointMark() call on closed %s", 
getClass().getName());
         if (reader == null) {
           // Reader hasn't started, checkpoint the residualSource.
           return new Checkpoint<>(null /* residualElements */, residualSource);
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
index cd4b49262fc..31f6842a42b 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
@@ -26,9 +26,15 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Random;
 import 
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
@@ -69,10 +75,14 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Unit tests for {@link UnboundedReadFromBoundedSource}. */
 @RunWith(JUnit4.class)
 public class UnboundedReadFromBoundedSourceTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(UnboundedReadFromBoundedSourceTest.class);
 
   @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
 
@@ -280,6 +290,38 @@ public class UnboundedReadFromBoundedSourceTest {
     unboundedSource.createReader(options, checkpoint).getCurrent();
   }
 
+  @Test
+  public void testReadersClosedProperly() throws IOException {
+    ManagedReaderBoundedSource boundedSource = new 
ManagedReaderBoundedSource(0, 10);
+    BoundedToUnboundedSourceAdapter<Integer> unboundedSource =
+        new BoundedToUnboundedSourceAdapter<>(boundedSource);
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    BoundedToUnboundedSourceAdapter<Integer>.Reader reader =
+        unboundedSource.createReader(options, new Checkpoint<Integer>(null, 
boundedSource));
+
+    for (int i = 0; i < 3; ++i) {
+      if (i == 0) {
+        assertTrue(reader.start());
+      } else {
+        assertTrue(reader.advance());
+      }
+      assertEquals(i, (int) reader.getCurrent());
+    }
+    Checkpoint<Integer> checkpoint = reader.getCheckpointMark();
+    List<TimestampedValue<Integer>> residualElements = 
checkpoint.getResidualElements();
+    for (int i = 0; i < 7; ++i) {
+      TimestampedValue<Integer> element = residualElements.get(i);
+      assertEquals(i + 3, (int) element.getValue());
+    }
+    for (int i = 0; i < 100; ++i) {
+      // A WeakReference of an object that no other objects reference are not 
immediately added to
+      // ReferenceQueue. To test this, we should run System.gc() multiple 
times.
+      // If a reader is GCed without closing, `cleanQueue` throws a 
RuntimeException.
+      boundedSource.cleanQueue();
+    }
+  }
+
   /** Generate byte array of given size. */
   private static byte[] generateInput(int size) {
     // Arbitrary but fixed seed
@@ -298,6 +340,7 @@ public class UnboundedReadFromBoundedSourceTest {
 
   /** Unsplittable source for use in tests. */
   private static class UnsplittableSource extends FileBasedSource<Byte> {
+
     public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) {
       super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
     }
@@ -323,6 +366,7 @@ public class UnboundedReadFromBoundedSourceTest {
     }
 
     private static class UnsplittableReader extends FileBasedReader<Byte> {
+
       ByteBuffer buff = ByteBuffer.allocate(1);
       Byte current;
       long offset;
@@ -370,4 +414,140 @@ public class UnboundedReadFromBoundedSourceTest {
       }
     }
   }
+
+  /**
+   * An integer generating bounded source. This source class checks if readers 
are closed properly.
+   * For that, it manages weak references of readers, and checks at 
`createReader` and `cleanQueue`
+   * if readers were closed before GCed. The `cleanQueue` does not change the 
state in
+   * `ManagedReaderBoundedSource`, but throws an exception if it finds a 
reader GCed without
+   * closing.
+   */
+  private static class ManagedReaderBoundedSource extends 
BoundedSource<Integer> {
+
+    private final int from;
+    private final int to; // exclusive
+
+    private transient ReferenceQueue<ManagedReader> refQueue;
+    private transient Map<Reference<ManagedReader>, CloseStatus> 
cloesStatusMap;
+
+    public ManagedReaderBoundedSource(int from, int to) {
+      if (from > to) {
+        throw new RuntimeException(
+            String.format("`from` <= `to`, but got from: %d, to: %d", from, 
to));
+      }
+      this.from = from;
+      this.to = to;
+    }
+
+    @Override
+    public List<? extends BoundedSource<Integer>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) {
+      return Collections.singletonList(this);
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) {
+      return (to - from) * 4L;
+    }
+
+    @Override
+    public BoundedReader<Integer> createReader(PipelineOptions options) {
+      // Add weak reference to queue to monitor GCed readers. If `CloseStatus` 
associated with
+      // reader is not closed, it means a reader was GCed without closing 
properly. The CloseStatus
+      // check for GCed readers are done at cleanQueue().
+      if (refQueue == null) {
+        refQueue = new ReferenceQueue<>();
+        cloesStatusMap = new HashMap<>();
+      }
+      cleanQueue();
+
+      CloseStatus status = new CloseStatus();
+      ManagedReader reader = new ManagedReader(status);
+      WeakReference<ManagedReader> reference = new WeakReference<>(reader, 
refQueue);
+      cloesStatusMap.put(reference, status);
+      LOG.info("Add reference {} for reader {}", reference, reader);
+      return reader;
+    }
+
+    public void cleanQueue() {
+      System.gc();
+
+      Reference<? extends ManagedReader> reference;
+      while ((reference = refQueue.poll()) != null) {
+        CloseStatus closeStatus = cloesStatusMap.get(reference);
+        LOG.info("Poll reference: {}, closed: {}", reference, 
closeStatus.closed);
+        closeStatus.throwIfNotClosed();
+      }
+    }
+
+    class CloseStatus {
+
+      private final RuntimeException allocationStacktrace;
+
+      private boolean closed;
+
+      public CloseStatus() {
+        allocationStacktrace =
+            new RuntimeException("Previous reader was not closed properly. 
Reader allocation was");
+        closed = false;
+      }
+
+      void close() {
+        cleanQueue();
+        closed = true;
+      }
+
+      void throwIfNotClosed() {
+        if (!closed) {
+          throw allocationStacktrace;
+        }
+      }
+    }
+
+    class ManagedReader extends BoundedReader<Integer> {
+
+      private final CloseStatus status;
+
+      int current;
+
+      public ManagedReader(CloseStatus status) {
+        this.status = status;
+      }
+
+      @Override
+      public boolean start() {
+        if (from < to) {
+          current = from;
+          return true;
+        } else {
+          return false;
+        }
+      }
+
+      @Override
+      public boolean advance() {
+        if (current + 1 < to) {
+          ++current;
+          return true;
+        } else {
+          return false;
+        }
+      }
+
+      @Override
+      public Integer getCurrent() {
+        return current;
+      }
+
+      @Override
+      public void close() {
+        status.close();
+      }
+
+      @Override
+      public BoundedSource<Integer> getCurrentSource() {
+        return ManagedReaderBoundedSource.this;
+      }
+    }
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
index 872dc1e89a7..a9050236efc 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
@@ -776,6 +776,9 @@ public class WorkerCustomSources {
 
   private static class UnboundedReaderIterator<T>
       extends 
NativeReader.NativeReaderIterator<WindowedValue<ValueWithRecordId<T>>> {
+    // Do not close reader. The reader is cached in 
StreamingModeExecutionContext.readerCache, and
+    // will be reused until the cache is evicted, expired or invalidated.
+    // See UnboundedReader#iterator().
     private final UnboundedSource.UnboundedReader<T> reader;
     private final StreamingModeExecutionContext context;
     private final boolean started;
@@ -862,7 +865,9 @@ public class WorkerCustomSources {
     }
 
     @Override
-    public void close() {}
+    public void close() {
+      // Don't close reader.
+    }
 
     @Override
     public NativeReader.Progress getProgress() {

Reply via email to