This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 78b5ffbb82c Refactor DoFnOp.FutureCollectorImpl to a top level class 
in SamzaRunner (#26274)
78b5ffbb82c is described below

commit 78b5ffbb82c7220c4a0c6c4755f23daccb53a159
Author: Katie Liu <kati...@linkedin.com>
AuthorDate: Fri Apr 14 11:36:27 2023 -0700

    Refactor DoFnOp.FutureCollectorImpl to a top level class in SamzaRunner 
(#26274)
---
 .../apache/beam/runners/samza/runtime/DoFnOp.java  | 74 ----------------
 .../runners/samza/runtime/FutureCollectorImpl.java | 99 ++++++++++++++++++++++
 .../runners/samza/runtime/AsyncDoFnRunnerTest.java |  2 +-
 .../samza/runtime/FutureCollectorImplTest.java     |  6 +-
 4 files changed, 103 insertions(+), 78 deletions(-)

diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index c693754b5b9..5ee763fca34 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -26,9 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.DoFnRunner;
@@ -46,7 +44,6 @@ import 
org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.samza.SamzaExecutionContext;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.runners.samza.util.DoFnUtils;
-import org.apache.beam.runners.samza.util.FutureUtils;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
@@ -484,77 +481,6 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, 
OutT, Void> {
                 windowedValue.getPane()));
   }
 
-  static class FutureCollectorImpl<OutT> implements FutureCollector<OutT> {
-    private final AtomicBoolean collectorSealed;
-    private CompletionStage<Collection<WindowedValue<OutT>>> outputFuture;
-
-    FutureCollectorImpl() {
-      outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
-      collectorSealed = new AtomicBoolean(true);
-    }
-
-    @Override
-    public void add(CompletionStage<WindowedValue<OutT>> element) {
-      checkState(
-          !collectorSealed.get(),
-          "Cannot add element to an unprepared collector. Make sure prepare() 
is invoked before adding elements.");
-
-      // We need synchronize guard against scenarios when watermark/finish 
bundle trigger outputs.
-      synchronized (this) {
-        outputFuture =
-            outputFuture.thenCombine(
-                element,
-                (collection, event) -> {
-                  collection.add(event);
-                  return collection;
-                });
-      }
-    }
-
-    @Override
-    public void addAll(CompletionStage<Collection<WindowedValue<OutT>>> 
elements) {
-      checkState(
-          !collectorSealed.get(),
-          "Cannot add elements to an unprepared collector. Make sure prepare() 
is invoked before adding elements.");
-
-      synchronized (this) {
-        outputFuture = FutureUtils.combineFutures(outputFuture, elements);
-      }
-    }
-
-    @Override
-    public void discard() {
-      collectorSealed.compareAndSet(false, true);
-
-      synchronized (this) {
-        outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
-      }
-    }
-
-    @Override
-    public CompletionStage<Collection<WindowedValue<OutT>>> finish() {
-      /*
-       * We can ignore the results here because its okay to call finish 
without invoking prepare. It will be a no-op
-       * and an empty collection will be returned.
-       */
-      collectorSealed.compareAndSet(false, true);
-
-      synchronized (this) {
-        final CompletionStage<Collection<WindowedValue<OutT>>> 
sealedOutputFuture = outputFuture;
-        outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
-        return sealedOutputFuture;
-      }
-    }
-
-    @Override
-    public void prepare() {
-      boolean isCollectorSealed = collectorSealed.compareAndSet(true, false);
-      checkState(
-          isCollectorSealed,
-          "Failed to prepare the collector. Collector needs to be sealed 
before prepare() is invoked.");
-    }
-  }
-
   /**
    * Factory class to create an {@link 
org.apache.beam.runners.core.DoFnRunners.OutputManager} that
    * emits values to the main output only, which is a single {@link
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollectorImpl.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollectorImpl.java
new file mode 100644
index 00000000000..10f915a8baf
--- /dev/null
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollectorImpl.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.runners.samza.util.FutureUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+
+class FutureCollectorImpl<OutT> implements FutureCollector<OutT> {
+  private final AtomicBoolean collectorSealed;
+  private CompletionStage<Collection<WindowedValue<OutT>>> outputFuture;
+
+  FutureCollectorImpl() {
+    outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
+    collectorSealed = new AtomicBoolean(true);
+  }
+
+  @Override
+  public void add(CompletionStage<WindowedValue<OutT>> element) {
+    checkState(
+        !collectorSealed.get(),
+        "Cannot add element to an unprepared collector. Make sure prepare() is 
invoked before adding elements.");
+
+    // We need synchronize guard against scenarios when watermark/finish 
bundle trigger outputs.
+    synchronized (this) {
+      outputFuture =
+          outputFuture.thenCombine(
+              element,
+              (collection, event) -> {
+                collection.add(event);
+                return collection;
+              });
+    }
+  }
+
+  @Override
+  public void addAll(CompletionStage<Collection<WindowedValue<OutT>>> 
elements) {
+    checkState(
+        !collectorSealed.get(),
+        "Cannot add elements to an unprepared collector. Make sure prepare() 
is invoked before adding elements.");
+
+    synchronized (this) {
+      outputFuture = FutureUtils.combineFutures(outputFuture, elements);
+    }
+  }
+
+  @Override
+  public void discard() {
+    collectorSealed.compareAndSet(false, true);
+
+    synchronized (this) {
+      outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
+    }
+  }
+
+  @Override
+  public CompletionStage<Collection<WindowedValue<OutT>>> finish() {
+    /*
+     * We can ignore the results here because its okay to call finish without 
invoking prepare. It will be a no-op
+     * and an empty collection will be returned.
+     */
+    collectorSealed.compareAndSet(false, true);
+
+    synchronized (this) {
+      final CompletionStage<Collection<WindowedValue<OutT>>> 
sealedOutputFuture = outputFuture;
+      outputFuture = CompletableFuture.completedFuture(new ArrayList<>());
+      return sealedOutputFuture;
+    }
+  }
+
+  @Override
+  public void prepare() {
+    boolean isCollectorSealed = collectorSealed.compareAndSet(true, false);
+    checkState(
+        isCollectorSealed,
+        "Failed to prepare the collector. Collector needs to be sealed before 
prepare() is invoked.");
+  }
+}
diff --git 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java
 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java
index 6add2f07986..983275c7b47 100644
--- 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java
+++ 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java
@@ -205,7 +205,7 @@ public class AsyncDoFnRunnerTest implements Serializable {
     options.setNumThreadsForProcessElement(4);
 
     final OpEmitter<Void> opEmitter = new OpAdapter.OpEmitterImpl<>();
-    final FutureCollector<Void> futureCollector = new 
DoFnOp.FutureCollectorImpl<>();
+    final FutureCollector<Void> futureCollector = new FutureCollectorImpl<>();
     futureCollector.prepare();
 
     final AsyncDoFnRunner<KV<String, Integer>, Void> asyncDoFnRunner =
diff --git 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java
 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java
index f126dd14b83..58d0fbd3f1b 100644
--- 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java
+++ 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java
@@ -31,14 +31,14 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-/** Unit tests for {@linkplain 
org.apache.beam.runners.samza.runtime.DoFnOp.FutureCollectorImpl}. */
+/** Unit tests for {@linkplain FutureCollectorImpl}. */
 public final class FutureCollectorImplTest {
   private static final List<String> RESULTS = ImmutableList.of("hello", 
"world");
-  private FutureCollector<String> futureCollector = new 
DoFnOp.FutureCollectorImpl<>();
+  private FutureCollector<String> futureCollector = new 
FutureCollectorImpl<>();
 
   @Before
   public void setup() {
-    futureCollector = new DoFnOp.FutureCollectorImpl<>();
+    futureCollector = new FutureCollectorImpl<>();
   }
 
   @Test(expected = IllegalStateException.class)

Reply via email to