Migrate shared Fn Execution code to Java7
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/012c2e64 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/012c2e64 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/012c2e64 Branch: refs/heads/mr-runner Commit: 012c2e64ceb691fd69649ba8cc02d9a2f16e519f Parents: 766b4f3 Author: Thomas Groh <tg...@google.com> Authored: Wed Oct 18 11:26:48 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Mon Nov 6 16:06:57 2017 -0800 ---------------------------------------------------------------------- runners/java-fn-execution/pom.xml | 14 ------ .../runners/fnexecution/ServerFactoryTest.java | 45 ++++++++++++++----- runners/pom.xml | 2 +- sdks/java/fn-execution/pom.xml | 20 ++------- .../org/apache/beam/harness/test/Consumer.java | 26 +++++++++++ .../org/apache/beam/harness/test/Supplier.java | 26 +++++++++++ .../apache/beam/harness/test/TestExecutors.java | 12 ++++- .../beam/harness/test/TestExecutorsTest.java | 29 +++++++++--- .../apache/beam/harness/test/TestStreams.java | 35 ++++++++++++--- .../beam/harness/test/TestStreamsTest.java | 47 +++++++++++++++----- .../apache/beam/fn/harness/FnHarnessTest.java | 4 +- .../harness/data/BeamFnDataGrpcClientTest.java | 5 ++- .../stream/BufferingStreamObserverTest.java | 2 +- .../stream/DirectStreamObserverTest.java | 2 +- sdks/java/pom.xml | 2 +- 15 files changed, 196 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/runners/java-fn-execution/pom.xml ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml index bd4fcf0..f57c58b 100644 --- a/runners/java-fn-execution/pom.xml +++ b/runners/java-fn-execution/pom.xml @@ -32,20 +32,6 @@ <packaging>jar</packaging> - <build> - <plugins> - <plugin> - <!-- Override Beam parent to allow Java8 --> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>1.8</source> - <target>1.8</target> - </configuration> - </plugin> - </plugins> - </build> - <dependencies> <dependency> <groupId>org.apache.beam</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java index aa8d246..b78e88a 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java @@ -39,6 +39,7 @@ import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import org.apache.beam.harness.channel.ManagedChannelFactory; +import org.apache.beam.harness.test.Consumer; import org.apache.beam.harness.test.TestStreams; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements; @@ -74,24 +75,48 @@ public class ServerFactoryTest { Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder = Endpoints.ApiServiceDescriptor.newBuilder(); - Collection<Elements> serverElements = new ArrayList<>(); - CountDownLatch clientHangedUp = new CountDownLatch(1); + final Collection<Elements> serverElements = new ArrayList<>(); + final CountDownLatch clientHangedUp = new CountDownLatch(1); CallStreamObserver<Elements> serverInboundObserver = - TestStreams.withOnNext(serverElements::add) - .withOnCompleted(clientHangedUp::countDown) - .build(); + TestStreams.withOnNext( + new Consumer<Elements>() { + @Override + public void accept(Elements item) { + serverElements.add(item); + } + }) + .withOnCompleted( + new Runnable() { + @Override + public void run() { + clientHangedUp.countDown(); + } + }) + .build(); TestDataService service = new TestDataService(serverInboundObserver); Server server = serverFactory.allocatePortAndCreate(service, apiServiceDescriptorBuilder); assertFalse(server.isShutdown()); ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build()); BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub(channel); - Collection<BeamFnApi.Elements> clientElements = new ArrayList<>(); - CountDownLatch serverHangedUp = new CountDownLatch(1); + final Collection<BeamFnApi.Elements> clientElements = new ArrayList<>(); + final CountDownLatch serverHangedUp = new CountDownLatch(1); CallStreamObserver<BeamFnApi.Elements> clientInboundObserver = - TestStreams.withOnNext(clientElements::add) - .withOnCompleted(serverHangedUp::countDown) - .build(); + TestStreams.withOnNext( + new Consumer<Elements>() { + @Override + public void accept(Elements item) { + clientElements.add(item); + } + }) + .withOnCompleted( + new Runnable() { + @Override + public void run() { + serverHangedUp.countDown(); + } + }) + .build(); StreamObserver<Elements> clientOutboundObserver = stub.data(clientInboundObserver); StreamObserver<BeamFnApi.Elements> serverOutboundObserver = service.outboundObservers.take(); http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/runners/pom.xml ---------------------------------------------------------------------- diff --git a/runners/pom.xml b/runners/pom.xml index df3faa9..47f3c0e 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -35,6 +35,7 @@ <modules> <module>core-construction-java</module> <module>core-java</module> + <module>java-fn-execution</module> <module>local-artifact-service-java</module> <module>reference</module> <module>direct-java</module> @@ -63,7 +64,6 @@ <jdk>[1.8,)</jdk> </activation> <modules> - <module>java-fn-execution</module> <module>gearpump</module> </modules> </profile> http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/pom.xml b/sdks/java/fn-execution/pom.xml index 9929c29..7c203eb 100644 --- a/sdks/java/fn-execution/pom.xml +++ b/sdks/java/fn-execution/pom.xml @@ -27,27 +27,13 @@ </parent> <artifactId>beam-sdks-java-fn-execution</artifactId> - <name>Apache Beam :: SDKs :: Java :: Harness Core</name> - <description>Contains code shared across the Beam Java SDK Harness and the Java Runner Harness - libraries. + <name>Apache Beam :: SDKs :: Java :: Fn Execution</name> + <description>Contains code shared across the Beam Java SDK Harness Java Runners to execute using + the Beam Portability Framework </description> <packaging>jar</packaging> - <build> - <plugins> - <plugin> - <!-- Override Beam parent to allow Java8 --> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>1.8</source> - <target>1.8</target> - </configuration> - </plugin> - </plugins> - </build> - <dependencies> <dependency> <groupId>org.apache.beam</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java new file mode 100644 index 0000000..279fc29 --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Consumer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.harness.test; + +/** + * A fork of the Java 8 consumer interface. This exists to enable migration for existing consumers. + */ +public interface Consumer<T> { + void accept(T item); +} http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java new file mode 100644 index 0000000..629afc2 --- /dev/null +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/Supplier.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.harness.test; + +/** + * A fork of the Java 8 Supplier interface, to enable migrations. + */ +public interface Supplier<T> { + T get(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java index d818a61..ca12d5a 100644 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutors.java @@ -21,7 +21,6 @@ package org.apache.beam.harness.test; import com.google.common.util.concurrent.ForwardingExecutorService; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import org.junit.rules.TestRule; import org.junit.runner.Description; import org.junit.runners.model.Statement; @@ -31,6 +30,15 @@ import org.junit.runners.model.Statement; * allows for testing that tasks have exercised the appropriate shutdown logic. */ public class TestExecutors { + public static TestExecutorService from(final ExecutorService staticExecutorService) { + return from(new Supplier<ExecutorService>() { + @Override + public ExecutorService get() { + return staticExecutorService; + } + }); + } + public static TestExecutorService from(Supplier<ExecutorService> executorServiceSuppler) { return new FromSupplier(executorServiceSuppler); } @@ -48,7 +56,7 @@ public class TestExecutors { } @Override - public Statement apply(Statement statement, Description arg1) { + public Statement apply(final Statement statement, Description arg1) { return new Statement() { @Override public void evaluate() throws Throwable { http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java index 1381b55..f0c98e0 100644 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestExecutorsTest.java @@ -38,14 +38,19 @@ public class TestExecutorsTest { @Test public void testSuccessfulTermination() throws Throwable { ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(() -> service); + final TestExecutorService testService = TestExecutors.from(service); final AtomicBoolean taskRan = new AtomicBoolean(); testService .apply( new Statement() { @Override public void evaluate() throws Throwable { - testService.submit(() -> taskRan.set(true)); + testService.submit(new Runnable() { + @Override + public void run() { + taskRan.set(true); + } + }); } }, null) @@ -57,7 +62,7 @@ public class TestExecutorsTest { @Test public void testTaskBlocksForeverCausesFailure() throws Throwable { ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(() -> service); + final TestExecutorService testService = TestExecutors.from(service); final AtomicBoolean taskStarted = new AtomicBoolean(); final AtomicBoolean taskWasInterrupted = new AtomicBoolean(); try { @@ -66,7 +71,12 @@ public class TestExecutorsTest { new Statement() { @Override public void evaluate() throws Throwable { - testService.submit(this::taskToRun); + testService.submit(new Runnable() { + @Override + public void run() { + taskToRun(); + } + }); } private void taskToRun() { @@ -94,7 +104,7 @@ public class TestExecutorsTest { @Test public void testStatementFailurePropagatedCleanly() throws Throwable { ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(() -> service); + final TestExecutorService testService = TestExecutors.from(service); final RuntimeException exceptionToThrow = new RuntimeException(); try { testService @@ -118,7 +128,7 @@ public class TestExecutorsTest { public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate() throws Throwable { ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(() -> service); + final TestExecutorService testService = TestExecutors.from(service); final AtomicBoolean taskStarted = new AtomicBoolean(); final AtomicBoolean taskWasInterrupted = new AtomicBoolean(); final RuntimeException exceptionToThrow = new RuntimeException(); @@ -128,7 +138,12 @@ public class TestExecutorsTest { new Statement() { @Override public void evaluate() throws Throwable { - testService.submit(this::taskToRun); + testService.submit(new Runnable() { + @Override + public void run() { + taskToRun(); + } + }); throw exceptionToThrow; } http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java index a7b362d..3df743a 100644 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreams.java @@ -20,8 +20,6 @@ package org.apache.beam.harness.test; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; -import java.util.function.Consumer; -import java.util.function.Supplier; /** Utility methods which enable testing of {@link StreamObserver}s. */ public class TestStreams { @@ -32,9 +30,9 @@ public class TestStreams { public static <T> Builder<T> withOnNext(Consumer<T> onNext) { return new Builder<>(new ForwardingCallStreamObserver<>( onNext, - TestStreams::noop, - TestStreams::noop, - TestStreams::returnTrue)); + TestStreams.<Throwable>noopConsumer(), + TestStreams.noopRunnable(), + TestStreams.alwaysTrueSupplier())); } /** A builder for a test {@link CallStreamObserver} that performs various callbacks. */ @@ -72,7 +70,7 @@ public class TestStreams { * Returns a new {@link Builder} like this one with the specified * {@link StreamObserver#onError} callback. */ - public Builder<T> withOnError(Runnable onError) { + public Builder<T> withOnError(final Runnable onError) { return new Builder<>(new ForwardingCallStreamObserver<>( observer.onNext, new Consumer<Throwable>() { @@ -102,13 +100,38 @@ public class TestStreams { private static void noop() { } + private static Runnable noopRunnable() { + return new Runnable() { + @Override + public void run() { + } + }; + } + private static void noop(Throwable t) { } + private static <T> Consumer<T> noopConsumer() { + return new Consumer<T>() { + @Override + public void accept(T item) { + } + }; + } + private static boolean returnTrue() { return true; } + private static Supplier<Boolean> alwaysTrueSupplier() { + return new Supplier<Boolean>() { + @Override + public Boolean get() { + return true; + } + }; + } + /** A {@link CallStreamObserver} which executes the supplied callbacks. */ private static class ForwardingCallStreamObserver<T> extends CallStreamObserver<T> { private final Consumer<T> onNext; http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java index f5741ae..c578397 100644 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java +++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/harness/test/TestStreamsTest.java @@ -18,7 +18,6 @@ package org.apache.beam.harness.test; -import static org.hamcrest.Matchers.contains; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -26,6 +25,7 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.atomic.AtomicBoolean; +import org.hamcrest.Matchers; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -35,8 +35,13 @@ import org.junit.runners.JUnit4; public class TestStreamsTest { @Test public void testOnNextIsCalled() { - AtomicBoolean onNextWasCalled = new AtomicBoolean(); - TestStreams.withOnNext(onNextWasCalled::set).build().onNext(true); + final AtomicBoolean onNextWasCalled = new AtomicBoolean(); + TestStreams.withOnNext(new Consumer<Boolean>() { + @Override + public void accept(Boolean item) { + onNextWasCalled.set(item); + } + }).build().onNext(true); assertTrue(onNextWasCalled.get()); } @@ -44,7 +49,12 @@ public class TestStreamsTest { public void testIsReadyIsCalled() { final AtomicBoolean isReadyWasCalled = new AtomicBoolean(); assertFalse(TestStreams.withOnNext(null) - .withIsReady(() -> isReadyWasCalled.getAndSet(true)) + .withIsReady(new Supplier<Boolean>() { + @Override + public Boolean get() { + return isReadyWasCalled.getAndSet(true); + } + }) .build() .isReady()); assertTrue(isReadyWasCalled.get()); @@ -52,9 +62,14 @@ public class TestStreamsTest { @Test public void testOnCompletedIsCalled() { - AtomicBoolean onCompletedWasCalled = new AtomicBoolean(); + final AtomicBoolean onCompletedWasCalled = new AtomicBoolean(); TestStreams.withOnNext(null) - .withOnCompleted(() -> onCompletedWasCalled.set(true)) + .withOnCompleted(new Runnable() { + @Override + public void run() { + onCompletedWasCalled.set(true); + } + }) .build() .onCompleted(); assertTrue(onCompletedWasCalled.get()); @@ -63,9 +78,14 @@ public class TestStreamsTest { @Test public void testOnErrorRunnableIsCalled() { RuntimeException throwable = new RuntimeException(); - AtomicBoolean onErrorWasCalled = new AtomicBoolean(); + final AtomicBoolean onErrorWasCalled = new AtomicBoolean(); TestStreams.withOnNext(null) - .withOnError(() -> onErrorWasCalled.set(true)) + .withOnError(new Runnable() { + @Override + public void run() { + onErrorWasCalled.set(true); + } + }) .build() .onError(throwable); assertTrue(onErrorWasCalled.get()); @@ -74,11 +94,16 @@ public class TestStreamsTest { @Test public void testOnErrorConsumerIsCalled() { RuntimeException throwable = new RuntimeException(); - Collection<Throwable> onErrorWasCalled = new ArrayList<>(); + final Collection<Throwable> onErrorWasCalled = new ArrayList<>(); TestStreams.withOnNext(null) - .withOnError(onErrorWasCalled::add) + .withOnError(new Consumer<Throwable>() { + @Override + public void accept(Throwable item) { + onErrorWasCalled.add(item); + } + }) .build() .onError(throwable); - assertThat(onErrorWasCalled, contains(throwable)); + assertThat(onErrorWasCalled, Matchers.<Throwable>contains(throwable)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java index 66c31a8..c926414 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java @@ -28,7 +28,7 @@ import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.function.Consumer; +import org.apache.beam.harness.test.Consumer; import org.apache.beam.harness.test.TestStreams; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest; @@ -91,7 +91,7 @@ public class FnHarnessTest { responseObserver.onCompleted(); } }); - return TestStreams.withOnNext(new Consumer<BeamFnApi.InstructionResponse>() { + return TestStreams.withOnNext(new Consumer<InstructionResponse>() { @Override public void accept(InstructionResponse t) { instructionResponses.add(t); http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java index 7df8925..9e21398 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java @@ -41,12 +41,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Function; import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.harness.test.Consumer; import org.apache.beam.harness.test.TestStreams; import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements; import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc; import org.apache.beam.model.pipeline.v1.Endpoints; import org.apache.beam.sdk.coders.Coder; @@ -263,7 +264,7 @@ public class BeamFnDataGrpcClientTest { Collection<BeamFnApi.Elements> inboundServerValues = new ConcurrentLinkedQueue<>(); CallStreamObserver<BeamFnApi.Elements> inboundServerObserver = TestStreams.withOnNext( - new Consumer<BeamFnApi.Elements>() { + new Consumer<Elements>() { @Override public void accept(BeamFnApi.Elements t) { inboundServerValues.add(t); http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java index 3f66c4c..96648e9 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/BufferingStreamObserverTest.java @@ -31,7 +31,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; +import org.apache.beam.harness.test.Consumer; import org.apache.beam.harness.test.TestExecutors; import org.apache.beam.harness.test.TestExecutors.TestExecutorService; import org.apache.beam.harness.test.TestStreams; http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java index 120a73d..05d8d5a 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/stream/DirectStreamObserverTest.java @@ -31,7 +31,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; +import org.apache.beam.harness.test.Consumer; import org.apache.beam.harness.test.TestExecutors; import org.apache.beam.harness.test.TestExecutors.TestExecutorService; import org.apache.beam.harness.test.TestStreams; http://git-wip-us.apache.org/repos/asf/beam/blob/012c2e64/sdks/java/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml index 62e4ec3..c6ab234 100644 --- a/sdks/java/pom.xml +++ b/sdks/java/pom.xml @@ -40,6 +40,7 @@ <module>io</module> <module>maven-archetypes</module> <module>extensions</module> + <module>fn-execution</module> <!-- javadoc runs directly from the root parent as the last module in the build to be able to capture runner-specific javadoc. <module>javadoc</module> --> @@ -53,7 +54,6 @@ <jdk>[1.8,)</jdk> </activation> <modules> - <module>fn-execution</module> <module>harness</module> <module>container</module> <module>java8tests</module>