[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=112965&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112965 ]
ASF GitHub Bot logged work on BEAM-4145: ---------------------------------------- Author: ASF GitHub Bot Created on: 19/Jun/18 00:36 Start Date: 19/Jun/18 00:36 Worklog Time Spent: 10m Work Description: jkff closed pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java index 03a3b550eea..b4c475248d6 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java @@ -88,6 +88,7 @@ public RemoteEnvironment createEnvironment(Environment container) throws Excepti () -> { try { FnHarness.main( + "id", options, loggingServer.getApiServiceDescriptor(), controlServer.getApiServiceDescriptor(), diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java index 2c95ebc585a..25ba91e7f05 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java @@ -131,6 +131,7 @@ public void setup() throws Exception { sdkHarnessExecutor.submit( () -> FnHarness.main( + "id", PipelineOptionsFactory.create(), loggingServer.getApiServiceDescriptor(), controlServer.getApiServiceDescriptor(), diff --git a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InProcessManagedChannelFactory.java b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InProcessManagedChannelFactory.java deleted file mode 100644 index e134aecc5be..00000000000 --- a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InProcessManagedChannelFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.reference.testing; - -import io.grpc.ManagedChannel; -import io.grpc.inprocess.InProcessChannelBuilder; -import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; -import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; - -/** - * A {@link org.apache.beam.sdk.fn.channel.ManagedChannelFactory} that uses in-process channels. - * - * <p>The channel builder uses {@link ApiServiceDescriptor#getUrl()} as the unique in-process name. - */ -public class InProcessManagedChannelFactory extends ManagedChannelFactory { - - @Override - public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { - return InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); - } -} diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 1c80e0bab94..ad7a35d2972 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -92,6 +92,7 @@ func main() { // (3) Invoke the Java harness, preserving artifact ordering in classpath. + os.Setenv("HARNESS_ID", *id) os.Setenv("PIPELINE_OPTIONS", options) os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", proto.MarshalTextString(&pb.ApiServiceDescriptor{Url: *loggingEndpoint})) os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", proto.MarshalTextString(&pb.ApiServiceDescriptor{Url: *controlEndpoint})) diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java index 0a4a35d58f1..57d2c68e363 100644 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.fn.channel; +import io.grpc.ClientInterceptor; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.netty.NettyChannelBuilder; @@ -26,11 +27,10 @@ import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.unix.DomainSocketAddress; import java.net.SocketAddress; +import java.util.List; import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; -/** - * A Factory which creates an underlying {@link ManagedChannel} implementation. - */ +/** A Factory which creates an underlying {@link ManagedChannel} implementation. */ public abstract class ManagedChannelFactory { public static ManagedChannelFactory createDefault() { return new Default(); @@ -41,7 +41,20 @@ public static ManagedChannelFactory createEpoll() { return new Epoll(); } - public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor); + public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { + return builderFor(apiServiceDescriptor).build(); + } + + /** Create a {@link ManagedChannelBuilder} for the provided {@link ApiServiceDescriptor}. */ + protected abstract ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor descriptor); + + /** + * Returns a {@link ManagedChannelFactory} like this one, but which will apply the provided {@link + * ClientInterceptor ClientInterceptors} to any channel it creates. + */ + public ManagedChannelFactory withInterceptors(List<ClientInterceptor> interceptors) { + return new InterceptedManagedChannelFactory(this, interceptors); + } /** * Creates a {@link ManagedChannel} backed by an {@link EpollDomainSocketChannel} if the address @@ -50,17 +63,18 @@ public static ManagedChannelFactory createEpoll() { */ private static class Epoll extends ManagedChannelFactory { @Override - public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { + public ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor apiServiceDescriptor) { SocketAddress address = SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl()); return NettyChannelBuilder.forAddress(address) - .channelType(address instanceof DomainSocketAddress - ? EpollDomainSocketChannel.class : EpollSocketChannel.class) + .channelType( + address instanceof DomainSocketAddress + ? EpollDomainSocketChannel.class + : EpollSocketChannel.class) .eventLoopGroup(new EpollEventLoopGroup()) .usePlaintext(true) // Set the message size to max value here. The actual size is governed by the // buffer size in the layers above. - .maxInboundMessageSize(Integer.MAX_VALUE) - .build(); + .maxInboundMessageSize(Integer.MAX_VALUE); } } @@ -70,13 +84,38 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { */ private static class Default extends ManagedChannelFactory { @Override - public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { + public ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor apiServiceDescriptor) { return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl()) .usePlaintext(true) // Set the message size to max value here. The actual size is governed by the // buffer size in the layers above. - .maxInboundMessageSize(Integer.MAX_VALUE) - .build(); + .maxInboundMessageSize(Integer.MAX_VALUE); + } + } + + private static class InterceptedManagedChannelFactory extends ManagedChannelFactory { + private final ManagedChannelFactory channelFactory; + private final List<ClientInterceptor> interceptors; + + private InterceptedManagedChannelFactory( + ManagedChannelFactory managedChannelFactory, List<ClientInterceptor> interceptors) { + this.channelFactory = managedChannelFactory; + this.interceptors = interceptors; + } + + @Override + public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { + return builderFor(apiServiceDescriptor).intercept(interceptors).build(); + } + + @Override + protected ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor descriptor) { + return channelFactory.builderFor(descriptor); + } + + @Override + public ManagedChannelFactory withInterceptors(List<ClientInterceptor> interceptors) { + return new InterceptedManagedChannelFactory(channelFactory, interceptors); } } } diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java index 787047b7f6e..6e0d87fe907 100644 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.fn.test; -import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.grpc.inprocess.InProcessChannelBuilder; import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; @@ -35,7 +35,7 @@ public static ManagedChannelFactory create() { private InProcessManagedChannelFactory() {} @Override - public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { - return InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); + public ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor apiServiceDescriptor) { + return InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()); } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 204e828092d..8d036b27adc 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -49,18 +49,21 @@ * Main entry point into the Beam SDK Fn Harness for Java. * * <p>This entry point expects the following environment variables: + * * <ul> - * <li>LOGGING_API_SERVICE_DESCRIPTOR: A - * {@link org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor} encoded as text - * representing the endpoint that is to be connected to for the Beam Fn Logging service.</li> - * <li>CONTROL_API_SERVICE_DESCRIPTOR: A - * {@link Endpoints.ApiServiceDescriptor} encoded as text - * representing the endpoint that is to be connected to for the Beam Fn Control service.</li> + * <li>HARNESS_ID: A String representing the ID of this FnHarness. This will be added to the + * headers of calls to the Beam Control Service + * <li>LOGGING_API_SERVICE_DESCRIPTOR: A {@link + * org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor} encoded as text + * representing the endpoint that is to be connected to for the Beam Fn Logging service. + * <li>CONTROL_API_SERVICE_DESCRIPTOR: A {@link Endpoints.ApiServiceDescriptor} encoded as text + * representing the endpoint that is to be connected to for the Beam Fn Control service. * <li>PIPELINE_OPTIONS: A serialized form of {@link PipelineOptions}. See {@link PipelineOptions} - * for further details.</li> + * for further details. * </ul> */ public class FnHarness { + private static final String HARNESS_ID = "HARNESS_ID"; private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR"; private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR"; private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS"; @@ -76,14 +79,17 @@ public static void main(String[] args) throws Exception { System.out.format("SDK Fn Harness started%n"); + System.out.format("Harness ID %s%n", System.getenv(HARNESS_ID)); System.out.format("Logging location %s%n", System.getenv(LOGGING_API_SERVICE_DESCRIPTOR)); System.out.format("Control location %s%n", System.getenv(CONTROL_API_SERVICE_DESCRIPTOR)); System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS)); - ObjectMapper objectMapper = new ObjectMapper().registerModules( - ObjectMapper.findModules(ReflectHelpers.findClassLoader())); - PipelineOptions options = objectMapper.readValue( - System.getenv(PIPELINE_OPTIONS), PipelineOptions.class); + String id = System.getenv(HARNESS_ID); + ObjectMapper objectMapper = + new ObjectMapper() + .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + PipelineOptions options = + objectMapper.readValue(System.getenv(PIPELINE_OPTIONS), PipelineOptions.class); Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor = getApiServiceDescriptor(LOGGING_API_SERVICE_DESCRIPTOR); @@ -91,12 +97,15 @@ public static void main(String[] args) throws Exception { Endpoints.ApiServiceDescriptor controlApiServiceDescriptor = getApiServiceDescriptor(CONTROL_API_SERVICE_DESCRIPTOR); - main(options, loggingApiServiceDescriptor, controlApiServiceDescriptor); + main(id, options, loggingApiServiceDescriptor, controlApiServiceDescriptor); } - public static void main(PipelineOptions options, + public static void main( + String id, + PipelineOptions options, Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, - Endpoints.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception { + Endpoints.ApiServiceDescriptor controlApiServiceDescriptor) + throws Exception { ManagedChannelFactory channelFactory; List<String> experiments = options.as(ExperimentalOptions.class).getExperiments(); if (experiments != null && experiments.contains("beam_fn_api_epoll")) { @@ -107,6 +116,7 @@ public static void main(PipelineOptions options, StreamObserverFactory streamObserverFactory = HarnessStreamObserverFactories.fromOptions(options); main( + id, options, loggingApiServiceDescriptor, controlApiServiceDescriptor, @@ -115,43 +125,46 @@ public static void main(PipelineOptions options, } public static void main( + String id, PipelineOptions options, Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, Endpoints.ApiServiceDescriptor controlApiServiceDescriptor, ManagedChannelFactory channelFactory, StreamObserverFactory streamObserverFactory) { IdGenerator idGenerator = IdGenerators.decrementingLongs(); - try (BeamFnLoggingClient logging = new BeamFnLoggingClient( - options, - loggingApiServiceDescriptor, - channelFactory::forDescriptor)) { + try (BeamFnLoggingClient logging = + new BeamFnLoggingClient( + options, loggingApiServiceDescriptor, channelFactory::forDescriptor)) { LOG.info("Fn Harness started"); - EnumMap<BeamFnApi.InstructionRequest.RequestCase, + EnumMap< + BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<InstructionRequest, Builder>> handlers = new EnumMap<>(BeamFnApi.InstructionRequest.RequestCase.class); RegisterHandler fnApiRegistry = new RegisterHandler(); - BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient( - options, channelFactory::forDescriptor, streamObserverFactory::from); + BeamFnDataGrpcClient beamFnDataMultiplexer = + new BeamFnDataGrpcClient( + options, channelFactory::forDescriptor, streamObserverFactory::from); BeamFnStateGrpcClientCache beamFnStateGrpcClientCache = new BeamFnStateGrpcClientCache( options, idGenerator, channelFactory::forDescriptor, streamObserverFactory::from); - ProcessBundleHandler processBundleHandler = new ProcessBundleHandler( - options, - fnApiRegistry::getById, - beamFnDataMultiplexer, - beamFnStateGrpcClientCache); - handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER, - fnApiRegistry::register); - handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE, + ProcessBundleHandler processBundleHandler = + new ProcessBundleHandler( + options, fnApiRegistry::getById, beamFnDataMultiplexer, beamFnStateGrpcClientCache); + handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER, fnApiRegistry::register); + handlers.put( + BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE, processBundleHandler::processBundle); - BeamFnControlClient control = new BeamFnControlClient(controlApiServiceDescriptor, - channelFactory::forDescriptor, - streamObserverFactory::from, - handlers); + BeamFnControlClient control = + new BeamFnControlClient( + id, + controlApiServiceDescriptor, + channelFactory, + streamObserverFactory::from, + handlers); LOG.info("Entering instruction processing loop"); control.processInstructionRequests(options.as(GcsOptions.class).getExecutorService()); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/AddHarnessIdInterceptor.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/AddHarnessIdInterceptor.java new file mode 100644 index 00000000000..8a215607171 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/AddHarnessIdInterceptor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.beam.fn.harness.control; + +import static com.google.common.base.Preconditions.checkArgument; + +import io.grpc.ClientInterceptor; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.stub.MetadataUtils; + +/** A {@link ClientInterceptor} that attaches a provided SDK Harness ID to outgoing messages. */ +public class AddHarnessIdInterceptor { + private static final Key<String> ID_KEY = Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER); + + public static ClientInterceptor create(String harnessId) { + checkArgument(harnessId != null, "harnessId must not be null"); + Metadata md = new Metadata(); + md.put(ID_KEY, harnessId); + return MetadataUtils.newAttachHeadersInterceptor(md); + } + + // This is implemented via MetadataUtils, so we never actually create an instance of this class + private AddHarnessIdInterceptor() {} +} diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java index ab932c797d4..7c0ed198a4e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java @@ -20,8 +20,8 @@ import static com.google.common.base.Throwables.getStackTraceAsString; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Uninterruptibles; -import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.stub.StreamObserver; import java.util.EnumMap; @@ -31,11 +31,14 @@ import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingDeque; import java.util.function.BiFunction; -import java.util.function.Function; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest.RequestCase; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse.Builder; import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc; -import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; import org.apache.beam.sdk.fn.function.ThrowingFunction; import org.apache.beam.sdk.fn.stream.StreamObserverFactory.StreamObserverClientFactory; import org.slf4j.Logger; @@ -69,21 +72,23 @@ private final CompletableFuture<Object> onFinish; public BeamFnControlClient( - Endpoints.ApiServiceDescriptor apiServiceDescriptor, - Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory, + String id, + ApiServiceDescriptor apiServiceDescriptor, + ManagedChannelFactory channelFactory, BiFunction< - StreamObserverClientFactory<InstructionRequest, BeamFnApi.InstructionResponse>, - StreamObserver<BeamFnApi.InstructionRequest>, - StreamObserver<BeamFnApi.InstructionResponse>> + StreamObserverClientFactory<InstructionRequest, InstructionResponse>, + StreamObserver<InstructionRequest>, + StreamObserver<InstructionResponse>> streamObserverFactory, - EnumMap< - BeamFnApi.InstructionRequest.RequestCase, - ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>> - handlers) { + EnumMap<RequestCase, ThrowingFunction<InstructionRequest, Builder>> handlers) { this.bufferedInstructions = new LinkedBlockingDeque<>(); this.outboundObserver = streamObserverFactory.apply( - BeamFnControlGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::control, + BeamFnControlGrpc.newStub( + channelFactory + .withInterceptors(ImmutableList.of(AddHarnessIdInterceptor.create(id))) + .forDescriptor(apiServiceDescriptor)) + ::control, new InboundObserver()); this.handlers = handlers; this.onFinish = new CompletableFuture<>(); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java index 0e68f6d8dd1..8dc43e56727 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java @@ -118,7 +118,7 @@ public void testLaunchFnHarnessAndTeardownCleanly() throws Exception { .setUrl("localhost:" + controlServer.getPort()) .build(); - FnHarness.main(options, loggingDescriptor, controlDescriptor); + FnHarness.main("id", options, loggingDescriptor, controlDescriptor); assertThat(instructionResponses, contains(INSTRUCTION_RESPONSE)); } finally { controlServer.shutdownNow(); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java index c3661639d5c..d33705dc6bf 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java @@ -27,9 +27,7 @@ import static org.junit.Assert.fail; import com.google.common.util.concurrent.Uninterruptibles; -import io.grpc.ManagedChannel; import io.grpc.Server; -import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; @@ -49,6 +47,7 @@ import org.apache.beam.model.pipeline.v1.Endpoints; import org.apache.beam.sdk.fn.function.ThrowingFunction; import org.apache.beam.sdk.fn.stream.StreamObserverFactory.StreamObserverClientFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; import org.apache.beam.sdk.fn.test.TestStreams; import org.junit.Test; import org.junit.runner.RunWith; @@ -117,9 +116,6 @@ public void testDelegation() throws Exception { .build(); server.start(); try { - ManagedChannel channel = - InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); - EnumMap< BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>> @@ -137,8 +133,9 @@ public void testDelegation() throws Exception { BeamFnControlClient client = new BeamFnControlClient( + "", apiServiceDescriptor, - (Endpoints.ApiServiceDescriptor descriptor) -> channel, + InProcessManagedChannelFactory.create(), this::createStreamForTest, handlers); @@ -204,9 +201,6 @@ public void testJavaErrorResponse() throws Exception { .build(); server.start(); try { - ManagedChannel channel = - InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); - EnumMap< BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>> @@ -219,8 +213,9 @@ public void testJavaErrorResponse() throws Exception { BeamFnControlClient client = new BeamFnControlClient( + "", apiServiceDescriptor, - (Endpoints.ApiServiceDescriptor descriptor) -> channel, + InProcessManagedChannelFactory.create(), this::createStreamForTest, handlers); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 112965) Time Spent: 4h (was: 3h 50m) > Java SDK Harness populates control request headers with worker id > ----------------------------------------------------------------- > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness > Reporter: Ben Sidhom > Assignee: Eugene Kirpichov > Priority: Minor > Time Spent: 4h > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)