This is an automated email from the ASF dual-hosted git repository. xinyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 78b5ffbb82c Refactor DoFnOp.FutureCollectorImpl to a top level class in SamzaRunner (#26274) 78b5ffbb82c is described below commit 78b5ffbb82c7220c4a0c6c4755f23daccb53a159 Author: Katie Liu <kati...@linkedin.com> AuthorDate: Fri Apr 14 11:36:27 2023 -0700 Refactor DoFnOp.FutureCollectorImpl to a top level class in SamzaRunner (#26274) --- .../apache/beam/runners/samza/runtime/DoFnOp.java | 74 ---------------- .../runners/samza/runtime/FutureCollectorImpl.java | 99 ++++++++++++++++++++++ .../runners/samza/runtime/AsyncDoFnRunnerTest.java | 2 +- .../samza/runtime/FutureCollectorImplTest.java | 6 +- 4 files changed, 103 insertions(+), 78 deletions(-) diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index c693754b5b9..5ee763fca34 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -26,9 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.ServiceLoader; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.DoFnRunner; @@ -46,7 +44,6 @@ import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.samza.SamzaExecutionContext; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.util.DoFnUtils; -import org.apache.beam.runners.samza.util.FutureUtils; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; @@ -484,77 +481,6 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> { windowedValue.getPane())); } - static class FutureCollectorImpl<OutT> implements FutureCollector<OutT> { - private final AtomicBoolean collectorSealed; - private CompletionStage<Collection<WindowedValue<OutT>>> outputFuture; - - FutureCollectorImpl() { - outputFuture = CompletableFuture.completedFuture(new ArrayList<>()); - collectorSealed = new AtomicBoolean(true); - } - - @Override - public void add(CompletionStage<WindowedValue<OutT>> element) { - checkState( - !collectorSealed.get(), - "Cannot add element to an unprepared collector. Make sure prepare() is invoked before adding elements."); - - // We need synchronize guard against scenarios when watermark/finish bundle trigger outputs. - synchronized (this) { - outputFuture = - outputFuture.thenCombine( - element, - (collection, event) -> { - collection.add(event); - return collection; - }); - } - } - - @Override - public void addAll(CompletionStage<Collection<WindowedValue<OutT>>> elements) { - checkState( - !collectorSealed.get(), - "Cannot add elements to an unprepared collector. Make sure prepare() is invoked before adding elements."); - - synchronized (this) { - outputFuture = FutureUtils.combineFutures(outputFuture, elements); - } - } - - @Override - public void discard() { - collectorSealed.compareAndSet(false, true); - - synchronized (this) { - outputFuture = CompletableFuture.completedFuture(new ArrayList<>()); - } - } - - @Override - public CompletionStage<Collection<WindowedValue<OutT>>> finish() { - /* - * We can ignore the results here because its okay to call finish without invoking prepare. It will be a no-op - * and an empty collection will be returned. - */ - collectorSealed.compareAndSet(false, true); - - synchronized (this) { - final CompletionStage<Collection<WindowedValue<OutT>>> sealedOutputFuture = outputFuture; - outputFuture = CompletableFuture.completedFuture(new ArrayList<>()); - return sealedOutputFuture; - } - } - - @Override - public void prepare() { - boolean isCollectorSealed = collectorSealed.compareAndSet(true, false); - checkState( - isCollectorSealed, - "Failed to prepare the collector. Collector needs to be sealed before prepare() is invoked."); - } - } - /** * Factory class to create an {@link org.apache.beam.runners.core.DoFnRunners.OutputManager} that * emits values to the main output only, which is a single {@link diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollectorImpl.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollectorImpl.java new file mode 100644 index 00000000000..10f915a8baf --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollectorImpl.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.runtime; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.runners.samza.util.FutureUtils; +import org.apache.beam.sdk.util.WindowedValue; + +class FutureCollectorImpl<OutT> implements FutureCollector<OutT> { + private final AtomicBoolean collectorSealed; + private CompletionStage<Collection<WindowedValue<OutT>>> outputFuture; + + FutureCollectorImpl() { + outputFuture = CompletableFuture.completedFuture(new ArrayList<>()); + collectorSealed = new AtomicBoolean(true); + } + + @Override + public void add(CompletionStage<WindowedValue<OutT>> element) { + checkState( + !collectorSealed.get(), + "Cannot add element to an unprepared collector. Make sure prepare() is invoked before adding elements."); + + // We need synchronize guard against scenarios when watermark/finish bundle trigger outputs. + synchronized (this) { + outputFuture = + outputFuture.thenCombine( + element, + (collection, event) -> { + collection.add(event); + return collection; + }); + } + } + + @Override + public void addAll(CompletionStage<Collection<WindowedValue<OutT>>> elements) { + checkState( + !collectorSealed.get(), + "Cannot add elements to an unprepared collector. Make sure prepare() is invoked before adding elements."); + + synchronized (this) { + outputFuture = FutureUtils.combineFutures(outputFuture, elements); + } + } + + @Override + public void discard() { + collectorSealed.compareAndSet(false, true); + + synchronized (this) { + outputFuture = CompletableFuture.completedFuture(new ArrayList<>()); + } + } + + @Override + public CompletionStage<Collection<WindowedValue<OutT>>> finish() { + /* + * We can ignore the results here because its okay to call finish without invoking prepare. It will be a no-op + * and an empty collection will be returned. + */ + collectorSealed.compareAndSet(false, true); + + synchronized (this) { + final CompletionStage<Collection<WindowedValue<OutT>>> sealedOutputFuture = outputFuture; + outputFuture = CompletableFuture.completedFuture(new ArrayList<>()); + return sealedOutputFuture; + } + } + + @Override + public void prepare() { + boolean isCollectorSealed = collectorSealed.compareAndSet(true, false); + checkState( + isCollectorSealed, + "Failed to prepare the collector. Collector needs to be sealed before prepare() is invoked."); + } +} diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java index 6add2f07986..983275c7b47 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java @@ -205,7 +205,7 @@ public class AsyncDoFnRunnerTest implements Serializable { options.setNumThreadsForProcessElement(4); final OpEmitter<Void> opEmitter = new OpAdapter.OpEmitterImpl<>(); - final FutureCollector<Void> futureCollector = new DoFnOp.FutureCollectorImpl<>(); + final FutureCollector<Void> futureCollector = new FutureCollectorImpl<>(); futureCollector.prepare(); final AsyncDoFnRunner<KV<String, Integer>, Void> asyncDoFnRunner = diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java index f126dd14b83..58d0fbd3f1b 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java @@ -31,14 +31,14 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -/** Unit tests for {@linkplain org.apache.beam.runners.samza.runtime.DoFnOp.FutureCollectorImpl}. */ +/** Unit tests for {@linkplain FutureCollectorImpl}. */ public final class FutureCollectorImplTest { private static final List<String> RESULTS = ImmutableList.of("hello", "world"); - private FutureCollector<String> futureCollector = new DoFnOp.FutureCollectorImpl<>(); + private FutureCollector<String> futureCollector = new FutureCollectorImpl<>(); @Before public void setup() { - futureCollector = new DoFnOp.FutureCollectorImpl<>(); + futureCollector = new FutureCollectorImpl<>(); } @Test(expected = IllegalStateException.class)