[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=114435&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114435 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 21/Jun/18 19:39 Start Date: 21/Jun/18 19:39 Worklog Time Spent: 10m Work Description: angoenka commented on issue #5680: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5680#issuecomment-399219189 Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114435) Time Spent: 4h 40m (was: 4.5h) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Eugene Kirpichov >Priority: Minor > Fix For: 2.6.0 > > Time Spent: 4h 40m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=114436&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114436 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 21/Jun/18 19:39 Start Date: 21/Jun/18 19:39 Worklog Time Spent: 10m Work Description: jkff closed pull request #5680: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5680 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java index 588153fd933..a849d71031b 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java @@ -89,6 +89,7 @@ public RemoteEnvironment createEnvironment(Environment container) throws Excepti () -> { try { FnHarness.main( +"id", options, loggingServer.getApiServiceDescriptor(), controlServer.getApiServiceDescriptor(), diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java index 0d9c3b21323..cd9f8b03df7 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java @@ -132,6 +132,7 @@ public void setup() throws Exception { sdkHarnessExecutor.submit( () -> FnHarness.main( +"id", PipelineOptionsFactory.create(), loggingServer.getApiServiceDescriptor(), controlServer.getApiServiceDescriptor(), diff --git a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InProcessManagedChannelFactory.java b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InProcessManagedChannelFactory.java deleted file mode 100644 index e134aecc5be..000 --- a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InProcessManagedChannelFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.reference.testing; - -import io.grpc.ManagedChannel; -import io.grpc.inprocess.InProcessChannelBuilder; -import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; -import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; - -/** - * A {@link org.apache.beam.sdk.fn.channel.ManagedChannelFactory} that uses in-process channels. - * - * The channel builder uses {@link ApiServiceDescriptor#getUrl()} as the unique in-process name. - */ -public class InProcessManagedChannelFactory extends ManagedChannelFactory { - - @Override - public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { -return InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); - } -} diff --git a/sdks/go/pkg/beam/util/grpcx/metadata.go b/sdks/go/pkg/beam/util/grpcx/metadata.go index eed51ede439..0d77d62a8f3 100644 --- a/sdks/go/pkg/beam/util/grpcx/metadata.go +++ b/sdks/go/pkg/beam/util/grpcx/metadata.go @@ -24,7 +24,7 @@ import ( "google.golang.org/grpc/metadata" ) -const idKey = "id" +const idKey = "worker_id" // ReadWorkerID reads
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=113449&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113449 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 19/Jun/18 23:40 Start Date: 19/Jun/18 23:40 Worklog Time Spent: 10m Work Description: jkff commented on issue #5680: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5680#issuecomment-398579048 Test passed (but didn't propagate status to Github), PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113449) Time Spent: 4.5h (was: 4h 20m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Eugene Kirpichov >Priority: Minor > Time Spent: 4.5h > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=113383&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113383 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 19/Jun/18 21:21 Start Date: 19/Jun/18 21:21 Worklog Time Spent: 10m Work Description: jkff commented on issue #5680: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5680#issuecomment-398550341 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113383) Time Spent: 4h 20m (was: 4h 10m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Eugene Kirpichov >Priority: Minor > Time Spent: 4h 20m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=113301&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113301 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 19/Jun/18 18:15 Start Date: 19/Jun/18 18:15 Worklog Time Spent: 10m Work Description: jkff commented on issue #5680: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5680#issuecomment-398495022 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113301) Time Spent: 4h 10m (was: 4h) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Eugene Kirpichov >Priority: Minor > Time Spent: 4h 10m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=112964&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112964 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 19/Jun/18 00:36 Start Date: 19/Jun/18 00:36 Worklog Time Spent: 10m Work Description: jkff commented on issue #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#issuecomment-398237617 This PR is replaced by #5680. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 112964) Time Spent: 3h 50m (was: 3h 40m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Eugene Kirpichov >Priority: Minor > Time Spent: 3h 50m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=112965&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112965 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 19/Jun/18 00:36 Start Date: 19/Jun/18 00:36 Worklog Time Spent: 10m Work Description: jkff closed pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java index 03a3b550eea..b4c475248d6 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java @@ -88,6 +88,7 @@ public RemoteEnvironment createEnvironment(Environment container) throws Excepti () -> { try { FnHarness.main( +"id", options, loggingServer.getApiServiceDescriptor(), controlServer.getApiServiceDescriptor(), diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java index 2c95ebc585a..25ba91e7f05 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java @@ -131,6 +131,7 @@ public void setup() throws Exception { sdkHarnessExecutor.submit( () -> FnHarness.main( +"id", PipelineOptionsFactory.create(), loggingServer.getApiServiceDescriptor(), controlServer.getApiServiceDescriptor(), diff --git a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InProcessManagedChannelFactory.java b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InProcessManagedChannelFactory.java deleted file mode 100644 index e134aecc5be..000 --- a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InProcessManagedChannelFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.reference.testing; - -import io.grpc.ManagedChannel; -import io.grpc.inprocess.InProcessChannelBuilder; -import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; -import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; - -/** - * A {@link org.apache.beam.sdk.fn.channel.ManagedChannelFactory} that uses in-process channels. - * - * The channel builder uses {@link ApiServiceDescriptor#getUrl()} as the unique in-process name. - */ -public class InProcessManagedChannelFactory extends ManagedChannelFactory { - - @Override - public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { -return InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); - } -} diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 1c80e0bab94..ad7a35d2972 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -92,6 +92,7 @@ func main() { // (3) Invoke the Java harness, preserving artifact ordering in classpath. + os.Setenv("HARNESS_ID", *id) os.Setenv("PIPELINE_OPTIONS",
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=112963&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112963 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 19/Jun/18 00:35 Start Date: 19/Jun/18 00:35 Worklog Time Spent: 10m Work Description: jkff opened a new pull request #5680: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5680 This is a take-over of #5456 since Thomas is currently unavailable. I took the PR verbatim, rebased it / resolved conflicts, and addressed the review comments: - Added a comment about lifetime of logging service. - To "We are loosing the original list of interceptors here." - I think it's conventional to have a withFoo(List) method overwrite the list rather than append, so no action on this one. R: @angoenka This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 112963) Time Spent: 3h 40m (was: 3.5h) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Eugene Kirpichov >Priority: Minor > Time Spent: 3h 40m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=108697&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108697 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 04/Jun/18 18:47 Start Date: 04/Jun/18 18:47 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r192842184 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ## @@ -115,43 +124,46 @@ public static void main(PipelineOptions options, } public static void main( + String id, PipelineOptions options, Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, Endpoints.ApiServiceDescriptor controlApiServiceDescriptor, ManagedChannelFactory channelFactory, StreamObserverFactory streamObserverFactory) { IdGenerator idGenerator = IdGenerators.decrementingLongs(); -try (BeamFnLoggingClient logging = new BeamFnLoggingClient( -options, -loggingApiServiceDescriptor, -channelFactory::forDescriptor)) { +try (BeamFnLoggingClient logging = Review comment: Can we add a comment explaining this as logging is unused which makes this code block non intuitive. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108697) Time Spent: 3h 20m (was: 3h 10m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 3h 20m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=108698&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108698 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 04/Jun/18 18:47 Start Date: 04/Jun/18 18:47 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r192269204 ## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java ## @@ -70,13 +92,42 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { */ private static class Default extends ManagedChannelFactory { @Override -public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { +public ManagedChannel forDescriptor( +ApiServiceDescriptor apiServiceDescriptor, List interceptors) { return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl()) .usePlaintext(true) // Set the message size to max value here. The actual size is governed by the // buffer size in the layers above. .maxInboundMessageSize(Integer.MAX_VALUE) + .intercept(interceptors) .build(); } } + + private static class InterceptedManagedChannelFactory extends ManagedChannelFactory { +private final ManagedChannelFactory channelFactory; +private final List interceptors; + +private InterceptedManagedChannelFactory( +ManagedChannelFactory managedChannelFactory, List interceptors) { + this.channelFactory = managedChannelFactory; + this.interceptors = interceptors; +} + +@Override +protected ManagedChannel forDescriptorOnly(ApiServiceDescriptor apiServiceDescriptor) { + return forDescriptor(apiServiceDescriptor, interceptors); +} + +@Override +protected ManagedChannel forDescriptor( +ApiServiceDescriptor apiServiceDescriptor, List interceptors) { + return channelFactory.forDescriptor(apiServiceDescriptor, interceptors); +} + +@Override +public ManagedChannelFactory withInterceptors(List interceptors) { + return new InterceptedManagedChannelFactory(channelFactory, interceptors); Review comment: We are loosing the original list of interceptors here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108698) Time Spent: 3.5h (was: 3h 20m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 3.5h > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=108696&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108696 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 04/Jun/18 18:47 Start Date: 04/Jun/18 18:47 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r192842380 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/AddHarnessIdInterceptor.java ## @@ -6,31 +6,33 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.beam.runners.reference.testing; -import io.grpc.ManagedChannel; -import io.grpc.inprocess.InProcessChannelBuilder; -import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; -import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; +package org.apache.beam.fn.harness.control; -/** - * A {@link org.apache.beam.sdk.fn.channel.ManagedChannelFactory} that uses in-process channels. - * - * The channel builder uses {@link ApiServiceDescriptor#getUrl()} as the unique in-process name. - */ -public class InProcessManagedChannelFactory extends ManagedChannelFactory { +import io.grpc.ClientInterceptor; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.stub.MetadataUtils; + +/** A {@link ClientInterceptor} that attaches a provided SDK Harness ID to outgoing messages. */ +public class AddHarnessIdInterceptor { Review comment: SG This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108696) Time Spent: 3h 10m (was: 3h) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 3h 10m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=108169&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108169 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 01/Jun/18 19:27 Start Date: 01/Jun/18 19:27 Worklog Time Spent: 10m Work Description: youngoli commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r192493270 ## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java ## @@ -70,13 +92,42 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { */ private static class Default extends ManagedChannelFactory { @Override -public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { +public ManagedChannel forDescriptor( +ApiServiceDescriptor apiServiceDescriptor, List interceptors) { return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl()) .usePlaintext(true) // Set the message size to max value here. The actual size is governed by the // buffer size in the layers above. .maxInboundMessageSize(Integer.MAX_VALUE) + .intercept(interceptors) .build(); } } + + private static class InterceptedManagedChannelFactory extends ManagedChannelFactory { +private final ManagedChannelFactory channelFactory; +private final List interceptors; + +private InterceptedManagedChannelFactory( +ManagedChannelFactory managedChannelFactory, List interceptors) { + this.channelFactory = managedChannelFactory; + this.interceptors = interceptors; +} + +@Override +protected ManagedChannel forDescriptorOnly(ApiServiceDescriptor apiServiceDescriptor) { + return forDescriptor(apiServiceDescriptor, interceptors); +} + +@Override +protected ManagedChannel forDescriptor( +ApiServiceDescriptor apiServiceDescriptor, List interceptors) { + return channelFactory.forDescriptor(apiServiceDescriptor, interceptors); +} + +@Override +public ManagedChannelFactory withInterceptors(List interceptors) { + return new InterceptedManagedChannelFactory(channelFactory, interceptors); Review comment: Yes, I'm also curious about this. If it's not for factory chaining and isn't necessary for another reason, could it be removed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 108169) Time Spent: 3h (was: 2h 50m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 3h > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106089&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106089 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 26/May/18 00:57 Start Date: 26/May/18 00:57 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191034939 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/AddHarnessIdInterceptor.java ## @@ -6,31 +6,33 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.beam.runners.reference.testing; -import io.grpc.ManagedChannel; -import io.grpc.inprocess.InProcessChannelBuilder; -import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; -import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; +package org.apache.beam.fn.harness.control; -/** - * A {@link org.apache.beam.sdk.fn.channel.ManagedChannelFactory} that uses in-process channels. - * - * The channel builder uses {@link ApiServiceDescriptor#getUrl()} as the unique in-process name. - */ -public class InProcessManagedChannelFactory extends ManagedChannelFactory { +import io.grpc.ClientInterceptor; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.stub.MetadataUtils; + +/** A {@link ClientInterceptor} that attaches a provided SDK Harness ID to outgoing messages. */ +public class AddHarnessIdInterceptor { + private static final Key ID_KEY = Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER); - @Override - public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { -return InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); + public static ClientInterceptor create(String harnessId) { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106089) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 2h 40m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106090&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106090 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 26/May/18 00:57 Start Date: 26/May/18 00:57 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191034956 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/AddHarnessIdInterceptor.java ## @@ -6,31 +6,33 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.beam.runners.reference.testing; -import io.grpc.ManagedChannel; -import io.grpc.inprocess.InProcessChannelBuilder; -import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; -import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; +package org.apache.beam.fn.harness.control; -/** - * A {@link org.apache.beam.sdk.fn.channel.ManagedChannelFactory} that uses in-process channels. - * - * The channel builder uses {@link ApiServiceDescriptor#getUrl()} as the unique in-process name. - */ -public class InProcessManagedChannelFactory extends ManagedChannelFactory { +import io.grpc.ClientInterceptor; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.stub.MetadataUtils; + +/** A {@link ClientInterceptor} that attaches a provided SDK Harness ID to outgoing messages. */ +public class AddHarnessIdInterceptor { Review comment: I find this much more readable as a factory method than as an inline key and `MetadataUtils` call - it tells us exactly the thing it does. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106090) Time Spent: 2h 50m (was: 2h 40m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 2h 50m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106086&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106086 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 26/May/18 00:57 Start Date: 26/May/18 00:57 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191034902 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ## @@ -115,43 +124,46 @@ public static void main(PipelineOptions options, } public static void main( + String id, PipelineOptions options, Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, Endpoints.ApiServiceDescriptor controlApiServiceDescriptor, ManagedChannelFactory channelFactory, StreamObserverFactory streamObserverFactory) { IdGenerator idGenerator = IdGenerators.decrementingLongs(); -try (BeamFnLoggingClient logging = new BeamFnLoggingClient( -options, -loggingApiServiceDescriptor, -channelFactory::forDescriptor)) { +try (BeamFnLoggingClient logging = Review comment: No. This is done to close the client automatically at the completion of the `try` block, and the client must exist to send intercepted LOG messages to the logging server. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106086) Time Spent: 2h 20m (was: 2h 10m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 2h 20m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106088&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106088 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 26/May/18 00:57 Start Date: 26/May/18 00:57 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191034893 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ## @@ -115,43 +124,46 @@ public static void main(PipelineOptions options, } public static void main( + String id, PipelineOptions options, Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, Endpoints.ApiServiceDescriptor controlApiServiceDescriptor, ManagedChannelFactory channelFactory, StreamObserverFactory streamObserverFactory) { IdGenerator idGenerator = IdGenerators.decrementingLongs(); -try (BeamFnLoggingClient logging = new BeamFnLoggingClient( -options, -loggingApiServiceDescriptor, -channelFactory::forDescriptor)) { +try (BeamFnLoggingClient logging = +new BeamFnLoggingClient( +options, loggingApiServiceDescriptor, channelFactory::forDescriptor)) { LOG.info("Fn Harness started"); - EnumMap Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 2h 40m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106085&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106085 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 26/May/18 00:57 Start Date: 26/May/18 00:57 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191034868 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ## @@ -80,23 +83,28 @@ public static void main(String[] args) throws Exception { System.out.format("Control location %s%n", System.getenv(CONTROL_API_SERVICE_DESCRIPTOR)); System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS)); -ObjectMapper objectMapper = new ObjectMapper().registerModules( -ObjectMapper.findModules(ReflectHelpers.findClassLoader())); -PipelineOptions options = objectMapper.readValue( -System.getenv(PIPELINE_OPTIONS), PipelineOptions.class); +String id = System.getenv(HARNESS_ID); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106085) Time Spent: 2h 10m (was: 2h) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 2h 10m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106087&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106087 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 26/May/18 00:57 Start Date: 26/May/18 00:57 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191034964 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java ## @@ -69,21 +72,23 @@ private final CompletableFuture onFinish; public BeamFnControlClient( - Endpoints.ApiServiceDescriptor apiServiceDescriptor, - Function channelFactory, + String id, + ApiServiceDescriptor apiServiceDescriptor, + ManagedChannelFactory channelFactory, BiFunction< - StreamObserverClientFactory, - StreamObserver, - StreamObserver> + StreamObserverClientFactory, Review comment: This is autoformatted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106087) Time Spent: 2.5h (was: 2h 20m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 2.5h > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106031&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106031 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 25/May/18 21:59 Start Date: 25/May/18 21:59 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191017457 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/AddHarnessIdInterceptor.java ## @@ -6,31 +6,33 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.beam.runners.reference.testing; -import io.grpc.ManagedChannel; -import io.grpc.inprocess.InProcessChannelBuilder; -import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; -import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; +package org.apache.beam.fn.harness.control; -/** - * A {@link org.apache.beam.sdk.fn.channel.ManagedChannelFactory} that uses in-process channels. - * - * The channel builder uses {@link ApiServiceDescriptor#getUrl()} as the unique in-process name. - */ -public class InProcessManagedChannelFactory extends ManagedChannelFactory { +import io.grpc.ClientInterceptor; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.stub.MetadataUtils; + +/** A {@link ClientInterceptor} that attaches a provided SDK Harness ID to outgoing messages. */ +public class AddHarnessIdInterceptor { + private static final Key ID_KEY = Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER); - @Override - public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { -return InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); + public static ClientInterceptor create(String harnessId) { Review comment: Shall we have a null check for harnessId? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106031) Time Spent: 1h 10m (was: 1h) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 1h 10m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106037&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106037 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 25/May/18 21:59 Start Date: 25/May/18 21:59 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191012929 ## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java ## @@ -70,13 +92,42 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { */ private static class Default extends ManagedChannelFactory { @Override -public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { +public ManagedChannel forDescriptor( +ApiServiceDescriptor apiServiceDescriptor, List interceptors) { return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl()) .usePlaintext(true) // Set the message size to max value here. The actual size is governed by the // buffer size in the layers above. .maxInboundMessageSize(Integer.MAX_VALUE) + .intercept(interceptors) .build(); } } + + private static class InterceptedManagedChannelFactory extends ManagedChannelFactory { +private final ManagedChannelFactory channelFactory; +private final List interceptors; + +private InterceptedManagedChannelFactory( +ManagedChannelFactory managedChannelFactory, List interceptors) { + this.channelFactory = managedChannelFactory; + this.interceptors = interceptors; +} + +@Override +protected ManagedChannel forDescriptorOnly(ApiServiceDescriptor apiServiceDescriptor) { + return forDescriptor(apiServiceDescriptor, interceptors); +} + +@Override +protected ManagedChannel forDescriptor( +ApiServiceDescriptor apiServiceDescriptor, List interceptors) { + return channelFactory.forDescriptor(apiServiceDescriptor, interceptors); +} + +@Override +public ManagedChannelFactory withInterceptors(List interceptors) { + return new InterceptedManagedChannelFactory(channelFactory, interceptors); Review comment: I suppose this method is for Factory chaining. Should we pass "this" in that case? I would suggest to remove chaining in this manner and provide a builder to do the chaining. Something like this: `ManagedChannelFactoryBuilder.builder(channelFactory).withInterceptors(interceptors)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106037) Time Spent: 1h 40m (was: 1.5h) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 1h 40m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106039&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106039 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 25/May/18 21:59 Start Date: 25/May/18 21:59 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191017692 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java ## @@ -69,21 +72,23 @@ private final CompletableFuture onFinish; public BeamFnControlClient( - Endpoints.ApiServiceDescriptor apiServiceDescriptor, - Function channelFactory, + String id, + ApiServiceDescriptor apiServiceDescriptor, + ManagedChannelFactory channelFactory, BiFunction< - StreamObserverClientFactory, - StreamObserver, - StreamObserver> + StreamObserverClientFactory, Review comment: Reformat? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106039) Time Spent: 2h (was: 1h 50m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 2h > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106038&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106038 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 25/May/18 21:59 Start Date: 25/May/18 21:59 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191015522 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ## @@ -115,43 +124,46 @@ public static void main(PipelineOptions options, } public static void main( + String id, PipelineOptions options, Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, Endpoints.ApiServiceDescriptor controlApiServiceDescriptor, ManagedChannelFactory channelFactory, StreamObserverFactory streamObserverFactory) { IdGenerator idGenerator = IdGenerators.decrementingLongs(); -try (BeamFnLoggingClient logging = new BeamFnLoggingClient( -options, -loggingApiServiceDescriptor, -channelFactory::forDescriptor)) { +try (BeamFnLoggingClient logging = +new BeamFnLoggingClient( +options, loggingApiServiceDescriptor, channelFactory::forDescriptor)) { LOG.info("Fn Harness started"); - EnumMap Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 1h 50m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106034&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106034 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 25/May/18 21:59 Start Date: 25/May/18 21:59 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191013612 ## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java ## @@ -41,7 +42,24 @@ public static ManagedChannelFactory createEpoll() { return new Epoll(); } - public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor); + public final ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { +return forDescriptorOnly(apiServiceDescriptor); + } + + protected ManagedChannel forDescriptorOnly(ApiServiceDescriptor descriptor) { Review comment: Based on the previous comment, we can remove this method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106034) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 1.5h > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106032&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106032 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 25/May/18 21:59 Start Date: 25/May/18 21:59 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191015783 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ## @@ -115,43 +124,46 @@ public static void main(PipelineOptions options, } public static void main( + String id, PipelineOptions options, Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, Endpoints.ApiServiceDescriptor controlApiServiceDescriptor, ManagedChannelFactory channelFactory, StreamObserverFactory streamObserverFactory) { IdGenerator idGenerator = IdGenerators.decrementingLongs(); -try (BeamFnLoggingClient logging = new BeamFnLoggingClient( -options, -loggingApiServiceDescriptor, -channelFactory::forDescriptor)) { +try (BeamFnLoggingClient logging = Review comment: logging is not used. Shall we remove it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106032) Time Spent: 1h 20m (was: 1h 10m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 1h 20m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106033&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106033 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 25/May/18 21:59 Start Date: 25/May/18 21:59 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191017563 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/AddHarnessIdInterceptor.java ## @@ -6,31 +6,33 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -package org.apache.beam.runners.reference.testing; -import io.grpc.ManagedChannel; -import io.grpc.inprocess.InProcessChannelBuilder; -import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; -import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; +package org.apache.beam.fn.harness.control; -/** - * A {@link org.apache.beam.sdk.fn.channel.ManagedChannelFactory} that uses in-process channels. - * - * The channel builder uses {@link ApiServiceDescriptor#getUrl()} as the unique in-process name. - */ -public class InProcessManagedChannelFactory extends ManagedChannelFactory { +import io.grpc.ClientInterceptor; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.stub.MetadataUtils; + +/** A {@link ClientInterceptor} that attaches a provided SDK Harness ID to outgoing messages. */ +public class AddHarnessIdInterceptor { Review comment: Not sure if we need a repository of interceptors. Just putting it as a thought. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106033) Time Spent: 1.5h (was: 1h 20m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 1.5h > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106029&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106029 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 25/May/18 21:59 Start Date: 25/May/18 21:59 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191014342 ## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java ## @@ -70,13 +92,42 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { */ private static class Default extends ManagedChannelFactory { @Override -public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { +public ManagedChannel forDescriptor( +ApiServiceDescriptor apiServiceDescriptor, List interceptors) { return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl()) .usePlaintext(true) // Set the message size to max value here. The actual size is governed by the // buffer size in the layers above. .maxInboundMessageSize(Integer.MAX_VALUE) + .intercept(interceptors) .build(); } } + + private static class InterceptedManagedChannelFactory extends ManagedChannelFactory { Review comment: What is the purpose of this class? Based on method `ManagedChannelFactory.forDescriptor(ApiServiceDescriptor apiServiceDescriptor, List interceptors)` All implementations of ManagedChannelFactory should support interception and if not, they should throw exception. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106029) Time Spent: 50m (was: 40m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 50m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106035&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106035 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 25/May/18 21:59 Start Date: 25/May/18 21:59 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191015296 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ## @@ -80,23 +83,28 @@ public static void main(String[] args) throws Exception { System.out.format("Control location %s%n", System.getenv(CONTROL_API_SERVICE_DESCRIPTOR)); System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS)); -ObjectMapper objectMapper = new ObjectMapper().registerModules( -ObjectMapper.findModules(ReflectHelpers.findClassLoader())); -PipelineOptions options = objectMapper.readValue( -System.getenv(PIPELINE_OPTIONS), PipelineOptions.class); +String id = System.getenv(HARNESS_ID); Review comment: Shall we also log this parameter as other parameters. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106035) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 1.5h > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106030&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106030 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 25/May/18 21:59 Start Date: 25/May/18 21:59 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191010767 ## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java ## @@ -41,7 +42,24 @@ public static ManagedChannelFactory createEpoll() { return new Epoll(); } - public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor); + public final ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { +return forDescriptorOnly(apiServiceDescriptor); Review comment: Shall we get away with this indirection? Implementer can always overwrite this public method (if we drop the final). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106030) Time Spent: 1h (was: 50m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 1h > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106036&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106036 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 25/May/18 21:59 Start Date: 25/May/18 21:59 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191014723 ## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java ## @@ -70,13 +92,42 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { */ private static class Default extends ManagedChannelFactory { @Override -public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { +public ManagedChannel forDescriptor( Review comment: As this method requires more than just descriptor, shall we rename it to something like `createChannel` or `getChannel`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106036) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 1.5h > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=106028&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106028 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 25/May/18 21:59 Start Date: 25/May/18 21:59 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#discussion_r191012317 ## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/ManagedChannelFactory.java ## @@ -70,13 +92,42 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { */ private static class Default extends ManagedChannelFactory { @Override -public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { +public ManagedChannel forDescriptor( +ApiServiceDescriptor apiServiceDescriptor, List interceptors) { return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl()) .usePlaintext(true) // Set the message size to max value here. The actual size is governed by the // buffer size in the layers above. .maxInboundMessageSize(Integer.MAX_VALUE) + .intercept(interceptors) .build(); } } + + private static class InterceptedManagedChannelFactory extends ManagedChannelFactory { +private final ManagedChannelFactory channelFactory; +private final List interceptors; + +private InterceptedManagedChannelFactory( +ManagedChannelFactory managedChannelFactory, List interceptors) { + this.channelFactory = managedChannelFactory; + this.interceptors = interceptors; +} + +@Override +protected ManagedChannel forDescriptorOnly(ApiServiceDescriptor apiServiceDescriptor) { + return forDescriptor(apiServiceDescriptor, interceptors); +} + +@Override +protected ManagedChannel forDescriptor( +ApiServiceDescriptor apiServiceDescriptor, List interceptors) { + return channelFactory.forDescriptor(apiServiceDescriptor, interceptors); Review comment: The behavior in not well defined. Though this is an intercepted factory, the original interceptors (interceptors from constructor) are not applied. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 106028) Time Spent: 40m (was: 0.5h) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 40m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=105714&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105714 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 24/May/18 20:29 Start Date: 24/May/18 20:29 Worklog Time Spent: 10m Work Description: tgroh commented on issue #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#issuecomment-391848961 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105714) Time Spent: 0.5h (was: 20m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=105267&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105267 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 23/May/18 18:58 Start Date: 23/May/18 18:58 Worklog Time Spent: 10m Work Description: tgroh commented on issue #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456#issuecomment-391459842 R: @angoenka @youngoli This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105267) Time Spent: 20m (was: 10m) > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4145) Java SDK Harness populates control request headers with worker id
[ https://issues.apache.org/jira/browse/BEAM-4145?focusedWorklogId=105258&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105258 ] ASF GitHub Bot logged work on BEAM-4145: Author: ASF GitHub Bot Created on: 23/May/18 18:45 Start Date: 23/May/18 18:45 Worklog Time Spent: 10m Work Description: tgroh opened a new pull request #5456: [BEAM-4145] Populate the worker_id metadata in the Java SDK Harness URL: https://github.com/apache/beam/pull/5456 The ID is required for any control service with multiple clients, which is the general case. Enable `ManagedChannelFactory` implementations to register interceptors on all of the created channels. Add an interceptor which attaches the ID to the metadata. Use this interceptor in the Sdk Harness Control Client. Populate the ID passed via the container contract in the boot go code. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 105258) Time Spent: 10m Remaining Estimate: 0h > Java SDK Harness populates control request headers with worker id > - > > Key: BEAM-4145 > URL: https://issues.apache.org/jira/browse/BEAM-4145 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Ben Sidhom >Assignee: Thomas Groh >Priority: Minor > Time Spent: 10m > Remaining Estimate: 0h > > Runner code needs to be able to identify incoming harness connections by the > worker ids that it assigns to them on creation. This is currently done by the > go boot code when the harness runs in a docker container. However, in-process > harnesses never specify worker ids. This prevents in-process harnesses from > being multiplexed by a runner (most likely the ULR and test code). -- This message was sent by Atlassian JIRA (v7.6.3#76005)