[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97661&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97661 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 02/May/18 19:12 Start Date: 02/May/18 19:12 Worklog Time Spent: 10m Work Description: tgroh closed pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java new file mode 100644 index 000..a8b582640b5 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.artifact; + +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest; + +/** + * Makes artifacts available to an ArtifactRetrievalService by + * encapsulating runner-specific resources. + */ +public interface ArtifactSource { + + /** + * Get the artifact manifest available from this source. + */ + Manifest getManifest() throws IOException; + + /** + * Get an artifact by its name. + */ + void getArtifact(String name, StreamObserver responseObserver); +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java new file mode 100644 index 000..96286fa0eef --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; + +/** + * A factory that has all job-scoped information, and can be combined with stage-scoped information + * to create a {@link StageBundleFactory}. + * + * Releases all job-scoped resources when closed. + */ +public interface JobBundleFactory extends AutoCloseable { + StageBundleFactory forStage( + ExecutableStage executableStage, StateRequestHandler stateRequestHandler); +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/OutputReceiverFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/OutputReceiverFactory.java new file mode 100644 index 000..ae818ca0726 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/OutputReceiverFactory.java @
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97625&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97625 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 02/May/18 18:22 Start Date: 02/May/18 18:22 Worklog Time Spent: 10m Work Description: axelmagn closed pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java new file mode 100644 index 000..a8b582640b5 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.artifact; + +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest; + +/** + * Makes artifacts available to an ArtifactRetrievalService by + * encapsulating runner-specific resources. + */ +public interface ArtifactSource { + + /** + * Get the artifact manifest available from this source. + */ + Manifest getManifest() throws IOException; + + /** + * Get an artifact by its name. + */ + void getArtifact(String name, StreamObserver responseObserver); +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java new file mode 100644 index 000..96286fa0eef --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; + +/** + * A factory that has all job-scoped information, and can be combined with stage-scoped information + * to create a {@link StageBundleFactory}. + * + * Releases all job-scoped resources when closed. + */ +public interface JobBundleFactory extends AutoCloseable { + StageBundleFactory forStage( + ExecutableStage executableStage, StateRequestHandler stateRequestHandler); +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/OutputReceiverFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/OutputReceiverFactory.java new file mode 100644 index 000..ae818ca0726 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/OutputReceiverFactory.jav
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97624&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97624 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 02/May/18 18:22 Start Date: 02/May/18 18:22 Worklog Time Spent: 10m Work Description: axelmagn commented on issue #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#issuecomment-386073836 Oops. changed it but didn't push it. should be fixed now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97624) Time Spent: 28h 40m (was: 28.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 28h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97627&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97627 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 02/May/18 18:22 Start Date: 02/May/18 18:22 Worklog Time Spent: 10m Work Description: axelmagn opened a new pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152 These are some interfaces that will be used on the worker to manage the lifetimes of remote environments and the related RPC services. The key addition is of `SdkHarnessManager`, which is responsible for managing these resources and can provide a `RemoteEnvironment` to runner operators. Follow this checklist to help us incorporate your contribution quickly and easily: - [x] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [x] Write a pull request description that is detailed enough to understand: - [x] What the pull request does - [x] Why it does it - [x] How it does it - [x] Why this approach - [x] Each commit in the pull request should have a meaningful subject line and body. - [x] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97627) Time Spent: 29h (was: 28h 50m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 29h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97612&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97612 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 02/May/18 17:57 Start Date: 02/May/18 17:57 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185584927 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/OutputReceiverFactory.java ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +/** + * A factory that can create output receivers during an executable stage. + */ +public interface OutputReceiverFactory { + /** + * Get a new {@link RemoteOutputReceiver} for an output PCollection. + */ + RemoteOutputReceiver create(String pCollectionId); Review comment: fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97612) Time Spent: 28.5h (was: 28h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 28.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97608&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97608 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 02/May/18 17:40 Start Date: 02/May/18 17:40 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185579308 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/OutputReceiverFactory.java ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +/** + * A factory that can create output receivers during an executable stage. + */ +public interface OutputReceiverFactory { + /** + * Get a new {@link RemoteOutputReceiver} for an output PCollection. + */ + RemoteOutputReceiver create(String pCollectionId); Review comment: This looks like it should be stateless, but as discussed with @bsidhom and myself, it's probable that within a single instance the caller should only ever invoke it once per `PCollection` - that ability should be commented explicitly, just so other integrators can understand permitted scoping effectively. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97608) Time Spent: 28h 20m (was: 28h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 28h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97607&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97607 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 02/May/18 17:37 Start Date: 02/May/18 17:37 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185578236 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +/** + * A bundle factory scoped to a particular + * {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the + * resources it needs to provide new {@link RemoteBundle RemoteBundles}. + * + * Closing a StageBundleFactory signals that a a stage has completed and any resources bound to + * its lifetime can be cleaned up. + */ +public interface StageBundleFactory extends AutoCloseable { + /** + * Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. Review comment: patched my commit to implement this. Let me know if you had something different in mind. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97607) Time Spent: 28h 10m (was: 28h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 28h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97597&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97597 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 02/May/18 17:21 Start Date: 02/May/18 17:21 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185572115 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +/** + * A bundle factory scoped to a particular + * {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the + * resources it needs to provide new {@link RemoteBundle RemoteBundles}. + * + * Closing a StageBundleFactory signals that a a stage has completed and any resources bound to + * its lifetime can be cleaned up. + */ +public interface StageBundleFactory extends AutoCloseable { + /** + * Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. Review comment: Instead of going for a map of `target -> output receiver`, I'd go for a factory that consumes `PCollectionId` which produces the appropriate `RemoteOutputReceiver` - this may be extended into consuming the `PTransformId` which produced the partial `PCollection` that the target represents. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97597) Time Spent: 28h (was: 27h 50m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 28h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97596&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97596 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 02/May/18 17:20 Start Date: 02/May/18 17:20 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185572115 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +/** + * A bundle factory scoped to a particular + * {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the + * resources it needs to provide new {@link RemoteBundle RemoteBundles}. + * + * Closing a StageBundleFactory signals that a a stage has completed and any resources bound to + * its lifetime can be cleaned up. + */ +public interface StageBundleFactory extends AutoCloseable { + /** + * Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. Review comment: Instead of going for a map of `target -> output receiver`, I'd go for a factory that consumes `PTransformId` and `PCollectionId` which produces the appropriate `RemoteOutputReceiver` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97596) Time Spent: 27h 50m (was: 27h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 27h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97592&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97592 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 02/May/18 17:16 Start Date: 02/May/18 17:16 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185572115 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +/** + * A bundle factory scoped to a particular + * {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the + * resources it needs to provide new {@link RemoteBundle RemoteBundles}. + * + * Closing a StageBundleFactory signals that a a stage has completed and any resources bound to + * its lifetime can be cleaned up. + */ +public interface StageBundleFactory extends AutoCloseable { + /** + * Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. Review comment: Instead of going for a map of `target -> output receiver`, I'd go for a factory that consumes `ProcessBundleDescriptor` and `Target` which produces the appropriate `RemoteOutputReceiver` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97592) Time Spent: 27h 40m (was: 27.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 27h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97326&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97326 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 02/May/18 02:31 Start Date: 02/May/18 02:31 Worklog Time Spent: 10m Work Description: axelmagn commented on issue #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#issuecomment-385847277 It looks like the build failures on :javaPreCommit are from unrelated packages. Is this a known issue, or am I breaking them? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97326) Time Spent: 27.5h (was: 27h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 27.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97293&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97293 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 23:34 Start Date: 01/May/18 23:34 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185361373 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +/** + * A bundle factory scoped to a particular + * {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the + * resources it needs to provide new {@link RemoteBundle RemoteBundles}. + * + * Closing a StageBundleFactory signals that a a stage has completed and any resources bound to + * its lifetime can be cleaned up. + */ +public interface StageBundleFactory extends AutoCloseable { + /** + * Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. Review comment: We were doing some weird munging in the hacking branch to correlate Targets with output destinations. This required reaching into `ProcessBundleDescriptor` in order to pull out the correct output tag: https://github.com/bsidhom/beam/blob/3e9b24a1247033e5d75f80e106d3d75052389799/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java#L132 This is no longer the correct abstraction to use now that we're concealing the bundle descriptor from the client. @tgroh How else can we identify output nodes? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97293) Time Spent: 27h 20m (was: 27h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 27h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97282&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97282 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 22:30 Start Date: 01/May/18 22:30 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185351524 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/RemoteOutputReceiver.java ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; + +/** + * A pair of {@link Coder} and {@link FnDataReceiver} which can be registered to receive elements + * for a {@link LogicalEndpoint}. + */ +@AutoValue +public abstract class RemoteOutputReceiver { + public static RemoteOutputReceiver of (Coder coder, FnDataReceiver receiver) { Review comment: vanilla Intellij being intellij. willfix. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97282) Time Spent: 27h 10m (was: 27h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 27h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97281&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97281 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 22:29 Start Date: 01/May/18 22:29 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185351422 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; + +/** + * A factory that has all job-scoped information, and can be combined with stage-scoped information + * to create a {@link StageBundleFactory}. + * + * Releases all job-scoped resources when closed. + */ +public interface JobBundleFactory extends AutoCloseable { Review comment: And JobScopedBundleCurry. But that was mostly because JobBundleOnion made me hungry. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97281) Time Spent: 27h (was: 26h 50m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 27h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97280&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97280 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 22:28 Start Date: 01/May/18 22:28 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185351331 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; + +/** + * A factory that has all job-scoped information, and can be combined with stage-scoped information + * to create a {@link StageBundleFactory}. + * + * Releases all job-scoped resources when closed. + */ +public interface JobBundleFactory extends AutoCloseable { Review comment: I briefly considered JobBundleOnion This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97280) Time Spent: 26h 50m (was: 26h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 26h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97278&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97278 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 22:27 Start Date: 01/May/18 22:27 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185350969 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.artifact; + +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest; + +/** + * Makes artifacts available to an ArtifactRetrievalService by + * encapsulating runner-specific resources. + */ +public interface ArtifactSource { + + /** + * Get the artifact manifest available from this source. + */ + Manifest getManifest() throws IOException; + + /** + * Get an artifact by its name. + */ + void getArtifact(String name, StreamObserver responseObserver); Review comment: Ideally we'd be able to write a utility that takes the `InputStream` and produces the stream of chunks of the appropriate type, and the actual service implementation is responsible for pushing the bytes that the source makes available, so we don't have to write multiple sources that know how to get input streams from names but don't know how to turn those streams into chunks, or streams of chunks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97278) Time Spent: 26h 40m (was: 26.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 26h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97276&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97276 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 22:24 Start Date: 01/May/18 22:24 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185350524 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/RemoteOutputReceiver.java ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.fn.data.LogicalEndpoint; + +/** + * A pair of {@link Coder} and {@link FnDataReceiver} which can be registered to receive elements + * for a {@link LogicalEndpoint}. + */ +@AutoValue +public abstract class RemoteOutputReceiver { + public static RemoteOutputReceiver of (Coder coder, FnDataReceiver receiver) { Review comment: Funky formatting This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97276) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 26.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97275&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97275 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 22:24 Start Date: 01/May/18 22:24 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185350217 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; + +/** + * A factory that has all job-scoped information, and can be combined with stage-scoped information + * to create a {@link StageBundleFactory}. + * + * Releases all job-scoped resources when closed. + */ +public interface JobBundleFactory extends AutoCloseable { Review comment: Not `StageBundleFactoryFactory`? (Please do not do this) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97275) Time Spent: 26.5h (was: 26h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 26.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97274&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97274 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 22:24 Start Date: 01/May/18 22:24 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185350084 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.artifact; + +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest; + +/** + * Makes artifacts available to an ArtifactRetrievalService by + * encapsulating runner-specific resources. + */ +public interface ArtifactSource { + + /** + * Get the artifact manifest available from this source. + */ + Manifest getManifest() throws IOException; + + /** + * Get an artifact by its name. + */ + void getArtifact(String name, StreamObserver responseObserver); Review comment: Alternatively this could return an `Iterable`, where the chunks are constructed from the stream by the service implementation This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97274) Time Spent: 26h 20m (was: 26h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 26h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97259&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97259 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 22:02 Start Date: 01/May/18 22:02 Worklog Time Spent: 10m Work Description: axelmagn commented on issue #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#issuecomment-385804172 Just fixed the build errors that were making problems for Jenkins. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97259) Time Spent: 26h 10m (was: 26h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 26h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=97241&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97241 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 21:55 Start Date: 01/May/18 21:55 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185345181 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java ## @@ -0,0 +1,13 @@ +package org.apache.beam.runners.fnexecution.control; Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 97241) Time Spent: 26h (was: 25h 50m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 26h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=96975&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96975 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 06:39 Start Date: 01/May/18 06:39 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185174398 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/RemoteBundle.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * A bundle capable of handling input data elements for a + * {@link org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor bundle descriptor} + * by forwarding them to a remote environment for processing. + * + * When a RemoteBundle is closed, it will block until bundle processing is finished on remote + * resources, and throw an exception if bundle processing has failed. + */ +public interface RemoteBundle extends AutoCloseable { + /** + * Get an id used to represent this bundle. + */ + String getBundleId(); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96975) Time Spent: 25h 50m (was: 25h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 25h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=96974&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96974 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 06:39 Start Date: 01/May/18 06:39 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185174383 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/RemoteBundle.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * A bundle capable of handling input data elements for a + * {@link org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor bundle descriptor} + * by forwarding them to a remote environment for processing. + * + * When a RemoteBundle is closed, it will block until bundle processing is finished on remote + * resources, and throw an exception if bundle processing has failed. + */ +public interface RemoteBundle extends AutoCloseable { + /** + * Get an id used to represent this bundle. + */ + String getBundleId(); + + /** + * Get a {@link FnDataReceiver receiver} which consumes input elements, forwarding them to the + * remote environment. + */ + FnDataReceiver> getInputReceiver(); Review comment: Possibly, for users, but in implementation it will be composing a FnDataReceiver that it receives from a FnDataServer. Since a RemoteBundle does more than just receive data (it also does bookkeeping around bundle-scoped resources on close), I'm not sure it would be the correct abstraction. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96974) Time Spent: 25h 40m (was: 25.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 25h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=96972&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96972 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 06:37 Start Date: 01/May/18 06:37 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185174137 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StaticBundleFactory.java ## @@ -0,0 +1,14 @@ +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; + +/** + * A factory that can create {@link JobBundleFactory JobBundleFactories} from job-scoped resources + * provided by an operator. + * + * This should be a global or singleton resource that caches JobBundleFactories between calls. Review comment: removed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96972) Time Spent: 25h 20m (was: 25h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 25h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=96973&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96973 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 06:37 Start Date: 01/May/18 06:37 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185174156 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +/** + * A bundle factory scoped to a particular + * {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the + * resources it needs to provide new {@link RemoteBundle RemoteBundles}. + * + * Closing a StageBundleFactory signals that a a stage has completed and any resources bound to + * its lifetime can be cleaned up. + */ +public interface StageBundleFactory extends AutoCloseable { + /** + * Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96973) Time Spent: 25.5h (was: 25h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 25.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=96970&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96970 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 06:36 Start Date: 01/May/18 06:36 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185174098 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; + +/** + * A factory that has all job-scoped information, and can be combined with stage-scoped information + * to create a {@link JobBundleFactory}. Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96970) Time Spent: 25h (was: 24h 50m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 25h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=96971&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-96971 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 01/May/18 06:36 Start Date: 01/May/18 06:36 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r185174104 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; + +/** + * A factory that has all job-scoped information, and can be combined with stage-scoped information + * to create a {@link JobBundleFactory}. + */ +public interface JobBundleFactory extends AutoCloseable { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 96971) Time Spent: 25h 10m (was: 25h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Time Spent: 25h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95784&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95784 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 27/Apr/18 00:00 Start Date: 27/Apr/18 00:00 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184561148 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; + +/** + * A factory that has all job-scoped information, and can be combined with stage-scoped information + * to create a {@link JobBundleFactory}. + */ +public interface JobBundleFactory extends AutoCloseable { Review comment: Clarify what happens when you close it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95784) Time Spent: 24h 10m (was: 24h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 24h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95790&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95790 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 27/Apr/18 00:00 Start Date: 27/Apr/18 00:00 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184562343 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java ## @@ -0,0 +1,16 @@ +package org.apache.beam.runners.fnexecution.control; + +/** + * A bundle factory scoped to a particular + * {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the + * resources it needs to provide new {@link RemoteBundle RemoteBundles}. + * + * Closing a StageBundleFactory signals that a a stage has completed and any resources bound to + * its lifetime can be cleaned up. + */ +public interface StageBundleFactory extends AutoCloseable { + /** + * Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. + */ + RemoteBundle getBundle() throws Exception; Review comment: Agreed. I'm not sure whether the state handler should be per-stage or per-bundle, but output receivers are definitely per-bundle and should probably be passed as something like `Map>>` keyed by PCollection ids of the bundle's outputs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95790) Time Spent: 24h 40m (was: 24.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 24h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95791&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95791 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 27/Apr/18 00:00 Start Date: 27/Apr/18 00:00 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184561565 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +/** + * A bundle factory scoped to a particular + * {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the + * resources it needs to provide new {@link RemoteBundle RemoteBundles}. + * + * Closing a StageBundleFactory signals that a a stage has completed and any resources bound to + * its lifetime can be cleaned up. + */ +public interface StageBundleFactory extends AutoCloseable { + /** + * Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. Review comment: in this stage This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95791) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 24h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95783&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95783 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 27/Apr/18 00:00 Start Date: 27/Apr/18 00:00 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184561122 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; + +/** + * A factory that has all job-scoped information, and can be combined with stage-scoped information + * to create a {@link JobBundleFactory}. Review comment: to create a StageBundleFactory This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95783) Time Spent: 24h (was: 23h 50m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 24h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95787&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95787 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 27/Apr/18 00:00 Start Date: 27/Apr/18 00:00 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184561539 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +/** + * A bundle factory scoped to a particular + * {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the + * resources it needs to provide new {@link RemoteBundle RemoteBundles}. + * + * Closing a StageBundleFactory signals that a a stage has completed and any resources bound to Review comment: that the stage This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95787) Time Spent: 24h 20m (was: 24h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 24h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95786&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95786 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 27/Apr/18 00:00 Start Date: 27/Apr/18 00:00 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184561434 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/RemoteBundle.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * A bundle capable of handling input data elements for a + * {@link org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor bundle descriptor} + * by forwarding them to a remote environment for processing. + * + * When a RemoteBundle is closed, it will block until bundle processing is finished on remote + * resources, and throw an exception if bundle processing has failed. + */ +public interface RemoteBundle extends AutoCloseable { + /** + * Get an id used to represent this bundle. + */ + String getBundleId(); Review comment: Would suggest to rename to simply getId() - "bundle" is implied This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95786) Time Spent: 24h 20m (was: 24h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 24h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95788&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95788 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 27/Apr/18 00:00 Start Date: 27/Apr/18 00:00 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184562601 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StaticBundleFactory.java ## @@ -0,0 +1,14 @@ +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; + +/** + * A factory that can create {@link JobBundleFactory JobBundleFactories} from job-scoped resources + * provided by an operator. + * + * This should be a global or singleton resource that caches JobBundleFactories between calls. Review comment: Yeah I think this doesn't need an interface. I'm also not sure whether its implementation per se should be runner-specific, or whether one should pass runner-specific hooks into a general-purpose implementation. For this PR, probably fine to either convert this to a class with a static method, or omit. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95788) Time Spent: 24.5h (was: 24h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 24.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95785&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95785 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 27/Apr/18 00:00 Start Date: 27/Apr/18 00:00 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184561494 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/RemoteBundle.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * A bundle capable of handling input data elements for a + * {@link org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor bundle descriptor} + * by forwarding them to a remote environment for processing. + * + * When a RemoteBundle is closed, it will block until bundle processing is finished on remote + * resources, and throw an exception if bundle processing has failed. + */ +public interface RemoteBundle extends AutoCloseable { + /** + * Get an id used to represent this bundle. + */ + String getBundleId(); + + /** + * Get a {@link FnDataReceiver receiver} which consumes input elements, forwarding them to the + * remote environment. + */ + FnDataReceiver> getInputReceiver(); Review comment: Maybe can RemoteBundle _implement_ FnDataReceiver instead of returning it? Seems it would be a bit more concise both for implementation and for users. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95785) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 24h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95789&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95789 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 27/Apr/18 00:00 Start Date: 27/Apr/18 00:00 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184561093 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.artifact; + +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest; + +/** + * Makes artifacts available to an ArtifactRetrievalService by + * encapsulating runner-specific resources. + */ +public interface ArtifactSource { + + /** + * Get the artifact manifest available from this source. + */ + Manifest getManifest() throws IOException; + + /** + * Get an artifact by its name. + */ + void getArtifact(String name, StreamObserver responseObserver); Review comment: Maybe it's possible to implement a `getInputStream()` method? I'm not sure it's worth it though, since this class is directly used to implement ArtifactRetrievalService, which speaks in the same StreamObserver, right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95789) Time Spent: 24.5h (was: 24h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 24.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95792&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95792 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 27/Apr/18 00:00 Start Date: 27/Apr/18 00:00 Worklog Time Spent: 10m Work Description: jkff commented on issue #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#issuecomment-384823613 CC: @angoenka This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95792) Time Spent: 24h 50m (was: 24h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 24h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95700&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95700 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 26/Apr/18 18:30 Start Date: 26/Apr/18 18:30 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184488836 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java ## @@ -0,0 +1,16 @@ +package org.apache.beam.runners.fnexecution.control; + +/** + * A bundle factory scoped to a particular + * {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the + * resources it needs to provide new {@link RemoteBundle RemoteBundles}. + * + * Closing a StageBundleFactory signals that a a stage has completed and any resources bound to + * its lifetime can be cleaned up. + */ +public interface StageBundleFactory extends AutoCloseable { + /** + * Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. + */ + RemoteBundle getBundle() throws Exception; Review comment: This is something I had not considered. This also makes me wonder if stateRequestHandlers can actually be scoped to the operator, or if they need to be scoped to the bundle (which seems to have the lifetime of a partition, not an operator). At any rate, any partition-scoped resources should probably be passed to `getBundle` in the `StageBundleFactory`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95700) Time Spent: 23h 50m (was: 23h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 23h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95699&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95699 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 26/Apr/18 18:29 Start Date: 26/Apr/18 18:29 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184488836 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java ## @@ -0,0 +1,16 @@ +package org.apache.beam.runners.fnexecution.control; + +/** + * A bundle factory scoped to a particular + * {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the + * resources it needs to provide new {@link RemoteBundle RemoteBundles}. + * + * Closing a StageBundleFactory signals that a a stage has completed and any resources bound to + * its lifetime can be cleaned up. + */ +public interface StageBundleFactory extends AutoCloseable { + /** + * Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. + */ + RemoteBundle getBundle() throws Exception; Review comment: This is something I had not considered. This also makes me wonder if stateRequestHandlers can actually be scoped to the operator, or if they need to be scoped to the bundle (which seems to have the lifetime of a partition, not an operator). At any rate, any partition-scoped resources should probably be passed to `getBundle` by the StageBundleFactory. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95699) Time Spent: 23h 40m (was: 23.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 23h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95696&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95696 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 26/Apr/18 18:24 Start Date: 26/Apr/18 18:24 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184487269 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.artifact; + +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest; + +/** + * Makes artifacts available to an ArtifactRetrievalService by + * encapsulating runner-specific resources. + */ +public interface ArtifactSource { + + /** + * Get the artifact manifest available from this source. + */ + Manifest getManifest() throws IOException; + + /** + * Get an artifact by its name. + */ + void getArtifact(String name, StreamObserver responseObserver); Review comment: I can't think of a more reasonable alternative. I'll add a doc comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95696) Time Spent: 23.5h (was: 23h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 23.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95693&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95693 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 26/Apr/18 18:20 Start Date: 26/Apr/18 18:20 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184486046 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StaticBundleFactory.java ## @@ -0,0 +1,14 @@ +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; + +/** + * A factory that can create {@link JobBundleFactory JobBundleFactories} from job-scoped resources + * provided by an operator. + * + * This should be a global or singleton resource that caches JobBundleFactories between calls. Review comment: I could just omit this class until I have a full implementation for it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95693) Time Spent: 23h 20m (was: 23h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 23h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95692&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95692 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 26/Apr/18 18:19 Start Date: 26/Apr/18 18:19 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184485820 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StaticBundleFactory.java ## @@ -0,0 +1,14 @@ +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; + +/** + * A factory that can create {@link JobBundleFactory JobBundleFactories} from job-scoped resources + * provided by an operator. + * + * This should be a global or singleton resource that caches JobBundleFactories between calls. Review comment: The StaticBundleFactory itself is meant to be a global singleton, caching one JobBundleFactory per job. While I agree that a class would make this clearer, I also think this is the most likely type to need a Flink/Spark/RR specific implementation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95692) Time Spent: 23h 10m (was: 23h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 23h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95294&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95294 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 25/Apr/18 23:02 Start Date: 25/Apr/18 23:02 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184229759 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java ## @@ -0,0 +1,16 @@ +package org.apache.beam.runners.fnexecution.control; + +/** + * A bundle factory scoped to a particular + * {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}, which has all of the + * resources it needs to provide new {@link RemoteBundle RemoteBundles}. + * + * Closing a StageBundleFactory signals that a a stage has completed and any resources bound to + * its lifetime can be cleaned up. + */ +public interface StageBundleFactory extends AutoCloseable { + /** + * Get a new {@link RemoteBundle bundle} for processing the data in an executable stage. + */ + RemoteBundle getBundle() throws Exception; Review comment: How do clients register bundle output receivers? Note that when interacting with the `SdkHarnessClient` directly, this is done via `newBundle`: https://github.com/apache/beam/blob/5bb482f536e75bfacac91a546ab3db843dd2a25e/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java#L114 Note that we cannot register output receivers at operator startup because the output receivers are attached their respective partition contexts (i.e., in the case of Flink outputs are sunk to the `Collector` which can only be used for the lifetime of a partition (`mapPartition` function call)). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95294) Time Spent: 22h 40m (was: 22.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 22h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95293&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95293 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 25/Apr/18 23:02 Start Date: 25/Apr/18 23:02 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184227583 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactory.java ## @@ -0,0 +1,13 @@ +package org.apache.beam.runners.fnexecution.control; Review comment: Please add a license header. Same applies to the other new files below. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95293) Time Spent: 22.5h (was: 22h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 22.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95295&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95295 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 25/Apr/18 23:02 Start Date: 25/Apr/18 23:02 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184228593 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.artifact; + +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest; + +/** + * Makes artifacts available to an ArtifactRetrievalService by + * encapsulating runner-specific resources. + */ +public interface ArtifactSource { + + /** + * Get the artifact manifest available from this source. + */ + Manifest getManifest() throws IOException; + + /** + * Get an artifact by its name. + */ + void getArtifact(String name, StreamObserver responseObserver); Review comment: Do we want to make clients interact directly with StreamObserver? This may be the most reasonable approach, but it also increases the burden on implementors due to gRPC semantics. For example, clients may need to wrap code to ensure that `onCompleted` or `onError` is always called to make sure RPCs complete in a timely fashion. If this is necessary, please add a doc comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95295) Time Spent: 22h 50m (was: 22h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 22h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95296&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95296 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 25/Apr/18 23:02 Start Date: 25/Apr/18 23:02 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r184230277 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StaticBundleFactory.java ## @@ -0,0 +1,14 @@ +package org.apache.beam.runners.fnexecution.control; + +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; + +/** + * A factory that can create {@link JobBundleFactory JobBundleFactories} from job-scoped resources + * provided by an operator. + * + * This should be a global or singleton resource that caches JobBundleFactories between calls. Review comment: Is the `StaticBundleFactory` itself meant to be a global singleton or is the retuned result meant to be singleton scoped (per job)? This might be clearer as a class rather than interface. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95296) Time Spent: 23h (was: 22h 50m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 23h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95262&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95262 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 25/Apr/18 21:49 Start Date: 25/Apr/18 21:49 Worklog Time Spent: 10m Work Description: axelmagn commented on issue #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#issuecomment-384445285 Revised commits that do away with `SdkHarnessManager` in favor of `{Static,Job,Stage}BundleFactory` cc: @jkff (who contributed to the new interface design) ptal: @tgroh (there were additional contributions from the spark runner team, and this represents a a synthesis of all the input so far) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95262) Time Spent: 22h 20m (was: 22h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 22h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95220&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95220 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 25/Apr/18 20:11 Start Date: 25/Apr/18 20:11 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#issuecomment-384418289 retest this please ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95220) Time Spent: 22h 10m (was: 22h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 22h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95219&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95219 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 25/Apr/18 20:10 Start Date: 25/Apr/18 20:10 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#issuecomment-384418200 please retest this This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95219) Time Spent: 22h (was: 21h 50m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 22h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95209&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95209 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 25/Apr/18 19:53 Start Date: 25/Apr/18 19:53 Worklog Time Spent: 10m Work Description: tgroh commented on issue #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#issuecomment-384413378 At this point that executor should be dead, as far as I know This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95209) Time Spent: 21h 50m (was: 21h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 21h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=95174&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-95174 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 25/Apr/18 18:40 Start Date: 25/Apr/18 18:40 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#issuecomment-384391892 There seem to be a lot of environment failures on the beam8 executor. Can we kill/restart it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 95174) Time Spent: 21h 40m (was: 21.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 21h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94636&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94636 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 24/Apr/18 15:37 Start Date: 24/Apr/18 15:37 Worklog Time Spent: 10m Work Description: tgroh closed pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/java-fn-execution/build.gradle b/runners/java-fn-execution/build.gradle index 462752f684a..258987089df 100644 --- a/runners/java-fn-execution/build.gradle +++ b/runners/java-fn-execution/build.gradle @@ -39,3 +39,18 @@ dependencies { testCompile library.java.mockito_core testCompile library.java.slf4j_simple } + +test { + useJUnit { +// Exclude tests that need Docker. +excludeCategories "org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker" + } +} + +task testDocker(type: Test) { + group = "Verification" + description = "Runs Docker tests" + useJUnit { +includeCategories "org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker" + } +} diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml index 515801538f7..bdf69c46a07 100644 --- a/runners/java-fn-execution/pom.xml +++ b/runners/java-fn-execution/pom.xml @@ -32,6 +32,53 @@ jar + + + +org.apache.maven.plugins +maven-surefire-plugin + + + + org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker + + + + + + + + + docker-tests + false + + + +org.apache.maven.plugins +maven-surefire-plugin + + +docker-tests +integration-test + + test + + + + org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker + + + + + + + + + + + org.apache.beam diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProvider.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProvider.java index 71089e33f5b..a8dde1514eb 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProvider.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProvider.java @@ -63,7 +63,14 @@ public static HeaderAccessor getHeaderAccessor() { @Override /** This method should be called from the request method. */ public String getSdkWorkerId() { - return SDK_WORKER_CONTEXT_KEY.get(); + // TODO: https://issues.apache.org/jira/browse/BEAM-4149 Some harnesses may not set the worker + // id header. Remove the null check below once this is fixed. + String workerId = SDK_WORKER_CONTEXT_KEY.get(); + if (workerId == null) { +return ""; + } else { +return workerId; + } } } } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java index e348dc22cac..c343fda5114 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java @@ -17,16 +17,64 @@ */ package org.apache.beam.runners.fnexecution.control; -import org.apache.beam.sdk.fn.function.ThrowingConsumer; -import org.apache.beam.sdk.util.ThrowingSupplier; +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.ThreadSafe; -/** Control client pool that exposes a source and sink of control clients. */ -public interface ControlClientPool { +/** + * A pool of control clients that brokers incoming SDK harness connections (in the form of {@link + * InstructionRequestHandler InstructionRequestHandlers}. + * + * Incoming instruction handlers usually come from the control plane gRPC service. Typical use: + * + * + * // Within owner of the pool, who may or may not o
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94402&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94402 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 24/Apr/18 00:53 Start Date: 24/Apr/18 00:53 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#issuecomment-383767932 History should be clean now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94402) Time Spent: 21h 20m (was: 21h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 21h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94396&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94396 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 24/Apr/18 00:39 Start Date: 24/Apr/18 00:39 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183573933 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java ## @@ -17,16 +17,58 @@ */ package org.apache.beam.runners.fnexecution.control; -import org.apache.beam.sdk.fn.function.ThrowingConsumer; -import org.apache.beam.sdk.util.ThrowingSupplier; +import javax.annotation.concurrent.ThreadSafe; -/** Control client pool that exposes a source and sink of control clients. */ -public interface ControlClientPool { +/** + * A pool of control clients that brokers incoming SDK harness connections (in the form of {@link + * InstructionRequestHandler InstructionRequestHandlers}. + * + * Incoming instruction handlers usually come from the control plane gRPC service. Typical use: + * + * + * // Within owner of the pool, who may or may not own the control plane server as well + * ControlClientPool pool = ... + * FnApiControlClientPoolService service = + * FnApiControlClientPoolService.offeringClientsToSink(pool.getSink(), headerAccessor) + * // Incoming gRPC control connections will now be added to the client pool. + * + * // Within code that interacts with the instruction handler. The get call blocks until an + * // incoming client is available: + * ControlClientSource clientSource = ... InstructionRequestHandler + * instructionHandler = clientSource.get("worker-id"); + * + * + * All {@link ControlClientPool} must be thread-safe. + */ +@ThreadSafe +public interface ControlClientPool { + + /** Sink for control clients. */ + Sink getSink(); /** Source of control clients. */ - ThrowingSupplier getSource(); + Source getSource(); - /** Sink for control clients. */ - ThrowingConsumer getSink(); + /** A sink for {@link InstructionRequestHandler InstructionRequestHandlers} keyed by worker id. */ + @FunctionalInterface + interface Sink { + +/** + * Puts an {@link InstructionRequestHandler} into a client pool. Worker ids must be unique per + * pool. + */ +void put(String workerId, InstructionRequestHandler instructionHandler) throws Exception; + } + + /** A source of {@link InstructionRequestHandler InstructionRequestHandlers}. */ + @FunctionalInterface + interface Source { +/** + * Retrieves the {@link InstructionRequestHandler} for the given worker id, blocking until + * available. Worker ids must be unique per pool. A given worker id must not be requested Review comment: Yes, the call will never return if this worker is never made available or is never explicitly failed. I'll add an explicit comment and rename the method. I ended up calling this `take`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94396) Time Spent: 21h 10m (was: 21h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 21h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94385&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94385 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 24/Apr/18 00:13 Start Date: 24/Apr/18 00:13 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183564671 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.ImmutableList; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** A docker command wrapper. Simplifies communications with the Docker daemon. */ +class DockerCommand { Review comment: The class is meant to wrap a single Docker executable and be used in a non-static manner, so I think singular makes sense here. I can see how the plural might make sense here, but it may also be confusing in this context. (Note: This changed from `DockerWrapper` to `DockerCommand` based on https://github.com/apache/beam/pull/5189#discussion_r18319). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94385) Time Spent: 20.5h (was: 20h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 20.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94387&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94387 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 24/Apr/18 00:13 Start Date: 24/Apr/18 00:13 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183567148 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java ## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link EnvironmentFactory} that creates docker containers by shelling out to docker. Returned + * {@link RemoteEnvironment RemoteEnvironments} own their respective docker containers. Not + * thread-safe. + */ +public class DockerEnvironmentFactory implements EnvironmentFactory { + + private static final Logger LOG = LoggerFactory.getLogger(DockerEnvironmentFactory.class); + + public static DockerEnvironmentFactory forServices( + DockerCommand docker, + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + ControlClientPool.Source clientSource, + // TODO: Refine this to IdGenerator when we determine where that should live. + Supplier idGenerator) { +return new DockerEnvironmentFactory( +docker, +controlServiceServer, +loggingServiceServer, +retrievalServiceServer, +provisioningServiceServer, +idGenerator, +clientSource); + } + + private final DockerCommand docker; + private final GrpcFnServer controlServiceServer; + private final GrpcFnServer loggingServiceServer; + private final GrpcFnServer retrievalServiceServer; + private final GrpcFnServer provisioningServiceServer; + private final Supplier idGenerator; + private final ControlClientPool.Source clientSource; + + private DockerEnvironmentFactory( + DockerCommand docker, + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + Supplier idGenerator, + ControlClientPool.Source clientSource) { +this.docker = docker; +this.controlServiceServer = controlServiceServer; +this.loggingServiceServer = loggingServiceServer; +this.retrievalServiceServer = retrievalServiceServer; +this.provisioningServiceServer = provisioningServiceServer; +this.idGenerator = idGenerator; +this.clientSource = clientSource; + } + + /** Creates a new, active {@link RemoteEnvironment} backed by a local Docker container. */ + @Override + public RemoteEnvironment createEnvironment(Environment environment) throws Exception { +String workerId = idGenerator.get(); + +// Prepare docker invocation. +Path workerPersistentDirectory = Files.createTempDirectory("worker_persistent_directory"); +Path semiPersistentD
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94384&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94384 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 24/Apr/18 00:13 Start Date: 24/Apr/18 00:13 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183564726 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.ImmutableList; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** A docker command wrapper. Simplifies communications with the Docker daemon. */ +class DockerCommand { + // TODO: Should we require 64-character container ids? Docker technically allows abbreviated ids, + // but we _should_ always capture full ids. + private static final Pattern CONTAINER_ID_PATTERN = Pattern.compile("\\p{XDigit}{64}"); + + static DockerCommand forCommand(String dockerExecutable, Duration commandTimeout) { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94384) Time Spent: 20h 20m (was: 20h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 20h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94389&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94389 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 24/Apr/18 00:13 Start Date: 24/Apr/18 00:13 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183565840 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; + +/** + * A {@link RemoteEnvironment} that wraps a running Docker container. + * + * A {@link DockerContainerEnvironment} owns both the underlying docker container that it + * communicates with an the {@link InstructionRequestHandler} that it uses to do so. + */ +@ThreadSafe +class DockerContainerEnvironment implements RemoteEnvironment { + + static DockerContainerEnvironment create( + DockerCommand docker, + Environment environment, + String containerId, + InstructionRequestHandler instructionHandler) { +return new DockerContainerEnvironment(docker, environment, containerId, instructionHandler); + } + + private final Object lock = new Object(); + private final DockerCommand docker; + private final Environment environment; + private final String containerId; + private final InstructionRequestHandler instructionHandler; + + private DockerContainerEnvironment( + DockerCommand docker, + Environment environment, + String containerId, + InstructionRequestHandler instructionHandler) { +this.docker = docker; +this.environment = environment; +this.containerId = containerId; +this.instructionHandler = instructionHandler; + } + + @Override + public Environment getEnvironment() { +return environment; + } + + @Override + public InstructionRequestHandler getInstructionRequestHandler() { +return instructionHandler; + } + + /** + * Closes this remote docker environment. The associated {@link InstructionRequestHandler} should + * not be used after calling this. + */ + @Override + public void close() throws Exception { +synchronized (lock) { Review comment: This was a result of https://github.com/apache/beam/pull/5189#discussion_r183470809. The net result is that it's generally unsafe for multiple threads to close a given docker environment. I was hesitant to provide synchronization here to emphasize the fact that there should only be a single owner, but @tgroh believed it was easier to understand if all methods were thread-safe. I'll make it idempotent and add an internal and javadoc comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94389) Time Spent: 21h (was: 20h 50m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 21h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This me
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94386&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94386 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 24/Apr/18 00:13 Start Date: 24/Apr/18 00:13 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183568223 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/DockerCommandTest.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DockerCommand}. */ +@Category(NeedsDocker.class) +@RunWith(JUnit4.class) +public class DockerCommandTest { + + @Test + public void helloWorld() throws Exception { +DockerCommand docker = getWrapper(); +String container = docker.runImage("hello-world", Collections.emptyList()); +System.out.printf("Started container: %s%n", container); + } + + @Test + public void killContainer() throws Exception { +DockerCommand docker = getWrapper(); +String container = docker.runImage("debian", Arrays.asList("/bin/bash", "-c", "sleep 60")); +docker.killContainer(container); Review comment: Done. I used a manual StopWatch rather than a method-level timeout annotation to make sure we're timing the the kill command and not startup time (which may include image download, etc.). I've increased the docker command timeout itself to allow this to run a bit longer. Not sure if it's appropriate to have such a long timeout though. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94386) Time Spent: 20h 40m (was: 20.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 20h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94383&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94383 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 24/Apr/18 00:13 Start Date: 24/Apr/18 00:13 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183566150 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; + +/** + * A {@link RemoteEnvironment} that wraps a running Docker container. + * + * A {@link DockerContainerEnvironment} owns both the underlying docker container that it + * communicates with an the {@link InstructionRequestHandler} that it uses to do so. + */ +@ThreadSafe +class DockerContainerEnvironment implements RemoteEnvironment { + + static DockerContainerEnvironment create( + DockerCommand docker, + Environment environment, + String containerId, + InstructionRequestHandler instructionHandler) { +return new DockerContainerEnvironment(docker, environment, containerId, instructionHandler); + } + + private final Object lock = new Object(); + private final DockerCommand docker; + private final Environment environment; + private final String containerId; + private final InstructionRequestHandler instructionHandler; + + private DockerContainerEnvironment( + DockerCommand docker, + Environment environment, + String containerId, + InstructionRequestHandler instructionHandler) { +this.docker = docker; +this.environment = environment; +this.containerId = containerId; +this.instructionHandler = instructionHandler; + } + + @Override + public Environment getEnvironment() { +return environment; + } + + @Override + public InstructionRequestHandler getInstructionRequestHandler() { +return instructionHandler; + } + + /** + * Closes this remote docker environment. The associated {@link InstructionRequestHandler} should + * not be used after calling this. + */ + @Override + public void close() throws Exception { +synchronized (lock) { Review comment: Side note: in general, it is safe to call `docker kill` multiple times, but subsequent calls will throw exceptions due to non-zero exit codes. I'm changing it such that only the first invocation does anything. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94383) Time Spent: 20h 20m (was: 20h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 20h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94388&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94388 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 24/Apr/18 00:13 Start Date: 24/Apr/18 00:13 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183571040 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/DockerCommandTest.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DockerCommand}. */ +@Category(NeedsDocker.class) +@RunWith(JUnit4.class) +public class DockerCommandTest { + + @Test + public void helloWorld() throws Exception { +DockerCommand docker = getWrapper(); +String container = docker.runImage("hello-world", Collections.emptyList()); +System.out.printf("Started container: %s%n", container); + } + + @Test + public void killContainer() throws Exception { +DockerCommand docker = getWrapper(); +String container = docker.runImage("debian", Arrays.asList("/bin/bash", "-c", "sleep 60")); +docker.killContainer(container); + } + + private static DockerCommand getWrapper() { +return DockerCommand.forCommand("docker", Duration.ofMillis(1)); + } Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94388) Time Spent: 20h 50m (was: 20h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 20h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94353&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94353 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 23/Apr/18 23:03 Start Date: 23/Apr/18 23:03 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183562815 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java ## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link EnvironmentFactory} that creates docker containers by shelling out to docker. Returned + * {@link RemoteEnvironment RemoteEnvironments} own their respective docker containers. Not + * thread-safe. + */ +public class DockerEnvironmentFactory implements EnvironmentFactory { + + private static final Logger LOG = LoggerFactory.getLogger(DockerEnvironmentFactory.class); + + public static DockerEnvironmentFactory forServices( + DockerCommand docker, + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + ControlClientPool.Source clientSource, + // TODO: Refine this to IdGenerator when we determine where that should live. + Supplier idGenerator) { +return new DockerEnvironmentFactory( +docker, +controlServiceServer, +loggingServiceServer, +retrievalServiceServer, +provisioningServiceServer, +idGenerator, +clientSource); + } + + private final DockerCommand docker; + private final GrpcFnServer controlServiceServer; + private final GrpcFnServer loggingServiceServer; + private final GrpcFnServer retrievalServiceServer; + private final GrpcFnServer provisioningServiceServer; + private final Supplier idGenerator; + private final ControlClientPool.Source clientSource; + + private DockerEnvironmentFactory( + DockerCommand docker, + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + Supplier idGenerator, + ControlClientPool.Source clientSource) { +this.docker = docker; +this.controlServiceServer = controlServiceServer; +this.loggingServiceServer = loggingServiceServer; +this.retrievalServiceServer = retrievalServiceServer; +this.provisioningServiceServer = provisioningServiceServer; +this.idGenerator = idGenerator; +this.clientSource = clientSource; + } + + /** Creates a new, active {@link RemoteEnvironment} backed by a local Docker container. */ + @Override + public RemoteEnvironment createEnvironment(Environment environment) throws Exception { +String workerId = idGenerator.get(); + +// Prepare docker invocation. +Path workerPersistentDirectory = Files.createTempDirectory("worker_persistent_directory"); +Path semiPersistentDire
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94358&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94358 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 23/Apr/18 23:03 Start Date: 23/Apr/18 23:03 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183562828 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ControlClientPool.java ## @@ -17,16 +17,58 @@ */ package org.apache.beam.runners.fnexecution.control; -import org.apache.beam.sdk.fn.function.ThrowingConsumer; -import org.apache.beam.sdk.util.ThrowingSupplier; +import javax.annotation.concurrent.ThreadSafe; -/** Control client pool that exposes a source and sink of control clients. */ -public interface ControlClientPool { +/** + * A pool of control clients that brokers incoming SDK harness connections (in the form of {@link + * InstructionRequestHandler InstructionRequestHandlers}. + * + * Incoming instruction handlers usually come from the control plane gRPC service. Typical use: + * + * + * // Within owner of the pool, who may or may not own the control plane server as well + * ControlClientPool pool = ... + * FnApiControlClientPoolService service = + * FnApiControlClientPoolService.offeringClientsToSink(pool.getSink(), headerAccessor) + * // Incoming gRPC control connections will now be added to the client pool. + * + * // Within code that interacts with the instruction handler. The get call blocks until an + * // incoming client is available: + * ControlClientSource clientSource = ... InstructionRequestHandler + * instructionHandler = clientSource.get("worker-id"); + * + * + * All {@link ControlClientPool} must be thread-safe. + */ +@ThreadSafe +public interface ControlClientPool { + + /** Sink for control clients. */ + Sink getSink(); /** Source of control clients. */ - ThrowingSupplier getSource(); + Source getSource(); - /** Sink for control clients. */ - ThrowingConsumer getSink(); + /** A sink for {@link InstructionRequestHandler InstructionRequestHandlers} keyed by worker id. */ + @FunctionalInterface + interface Sink { + +/** + * Puts an {@link InstructionRequestHandler} into a client pool. Worker ids must be unique per + * pool. + */ +void put(String workerId, InstructionRequestHandler instructionHandler) throws Exception; + } + + /** A source of {@link InstructionRequestHandler InstructionRequestHandlers}. */ + @FunctionalInterface + interface Source { +/** + * Retrieves the {@link InstructionRequestHandler} for the given worker id, blocking until + * available. Worker ids must be unique per pool. A given worker id must not be requested Review comment: This gives the impression that if the worker never becomes available, the call never returns. Is this the case? Also, minor note: this semantics (single get() and put() per workerId) makes me think that perhaps "offer/remove" would be better terminology than "get/put" - or, at least, "remove" would be better than "get". This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94358) Time Spent: 20h (was: 19h 50m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 20h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94359&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94359 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 23/Apr/18 23:03 Start Date: 23/Apr/18 23:03 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183562823 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/DockerCommandTest.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DockerCommand}. */ +@Category(NeedsDocker.class) +@RunWith(JUnit4.class) +public class DockerCommandTest { + + @Test + public void helloWorld() throws Exception { +DockerCommand docker = getWrapper(); +String container = docker.runImage("hello-world", Collections.emptyList()); +System.out.printf("Started container: %s%n", container); + } + + @Test + public void killContainer() throws Exception { +DockerCommand docker = getWrapper(); +String container = docker.runImage("debian", Arrays.asList("/bin/bash", "-c", "sleep 60")); +docker.killContainer(container); + } + + private static DockerCommand getWrapper() { +return DockerCommand.forCommand("docker", Duration.ofMillis(1)); + } Review comment: Would be good to assert also that exceptions actually include the necessary output. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94359) Time Spent: 20h 10m (was: 20h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 20h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94357&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94357 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 23/Apr/18 23:03 Start Date: 23/Apr/18 23:03 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183562825 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/DockerCommandTest.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import org.apache.beam.runners.fnexecution.environment.testing.NeedsDocker; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DockerCommand}. */ +@Category(NeedsDocker.class) +@RunWith(JUnit4.class) +public class DockerCommandTest { + + @Test + public void helloWorld() throws Exception { +DockerCommand docker = getWrapper(); +String container = docker.runImage("hello-world", Collections.emptyList()); +System.out.printf("Started container: %s%n", container); + } + + @Test + public void killContainer() throws Exception { +DockerCommand docker = getWrapper(); +String container = docker.runImage("debian", Arrays.asList("/bin/bash", "-c", "sleep 60")); +docker.killContainer(container); Review comment: Assert that this takes less than 60 seconds? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94357) Time Spent: 19h 50m (was: 19h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 19h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94355&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94355 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 23/Apr/18 23:03 Start Date: 23/Apr/18 23:03 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183562819 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.ImmutableList; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** A docker command wrapper. Simplifies communications with the Docker daemon. */ +class DockerCommand { + // TODO: Should we require 64-character container ids? Docker technically allows abbreviated ids, + // but we _should_ always capture full ids. + private static final Pattern CONTAINER_ID_PATTERN = Pattern.compile("\\p{XDigit}{64}"); + + static DockerCommand forCommand(String dockerExecutable, Duration commandTimeout) { Review comment: An invocation looks like DockerCommand.forCommand(dockerExecutable,...) - a bit odd. Maybe DockerCommands.forExecutable(...)? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94355) Time Spent: 19h 40m (was: 19.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 19h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94354&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94354 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 23/Apr/18 23:03 Start Date: 23/Apr/18 23:03 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183562821 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.ImmutableList; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** A docker command wrapper. Simplifies communications with the Docker daemon. */ +class DockerCommand { Review comment: DockerCommands perhaps? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94354) Time Spent: 19.5h (was: 19h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 19.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94356&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94356 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 23/Apr/18 23:03 Start Date: 23/Apr/18 23:03 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183562827 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; + +/** + * A {@link RemoteEnvironment} that wraps a running Docker container. + * + * A {@link DockerContainerEnvironment} owns both the underlying docker container that it + * communicates with an the {@link InstructionRequestHandler} that it uses to do so. + */ +@ThreadSafe +class DockerContainerEnvironment implements RemoteEnvironment { + + static DockerContainerEnvironment create( + DockerCommand docker, + Environment environment, + String containerId, + InstructionRequestHandler instructionHandler) { +return new DockerContainerEnvironment(docker, environment, containerId, instructionHandler); + } + + private final Object lock = new Object(); + private final DockerCommand docker; + private final Environment environment; + private final String containerId; + private final InstructionRequestHandler instructionHandler; + + private DockerContainerEnvironment( + DockerCommand docker, + Environment environment, + String containerId, + InstructionRequestHandler instructionHandler) { +this.docker = docker; +this.environment = environment; +this.containerId = containerId; +this.instructionHandler = instructionHandler; + } + + @Override + public Environment getEnvironment() { +return environment; + } + + @Override + public InstructionRequestHandler getInstructionRequestHandler() { +return instructionHandler; + } + + /** + * Closes this remote docker environment. The associated {@link InstructionRequestHandler} should + * not be used after calling this. + */ + @Override + public void close() throws Exception { +synchronized (lock) { Review comment: Could you add a comment why we add this synchronization? Is it because calling instructionHandler.close() from two threads at the same time is unsafe, or calling docker.killContainer(containerId) is unsafe, or because it's unsafe if two threads both do instructionHandler.close() and then both do docker.killContainer()? (but all of this is safe if done from two threads sequentially?) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94356) Time Spent: 19h 50m (was: 19h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 19h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94306&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94306 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 23/Apr/18 21:17 Start Date: 23/Apr/18 21:17 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183540566 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; +import org.apache.beam.runners.fnexecution.control.ControlClientSource; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService; + +/** + * An {@link EnvironmentFactory} that creates docker containers by shelling out to docker. Returned + * {@link RemoteEnvironment RemoteEnvironments} own their respective docker containers. Not + * thread-safe. + */ +public class DockerEnvironmentFactory implements EnvironmentFactory { + + public static DockerEnvironmentFactory forServices( + DockerWrapper docker, + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + ControlClientSource clientSource, + // TODO: Refine this to IdGenerator when we determine where that should live. + Supplier idGenerator) { +return new DockerEnvironmentFactory( +docker, +controlServiceServer, +loggingServiceServer, +retrievalServiceServer, +provisioningServiceServer, +idGenerator, +clientSource); + } + + private final DockerWrapper docker; + private final GrpcFnServer controlServiceServer; + private final GrpcFnServer loggingServiceServer; + private final GrpcFnServer retrievalServiceServer; + private final GrpcFnServer provisioningServiceServer; + private final Supplier idGenerator; + private final ControlClientSource clientSource; + + private DockerEnvironmentFactory( + DockerWrapper docker, + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + Supplier idGenerator, + ControlClientSource clientSource) { +this.docker = docker; +this.controlServiceServer = controlServiceServer; +this.loggingServiceServer = loggingServiceServer; +this.retrievalServiceServer = retrievalServiceServer; +this.provisioningServiceServer = provisioningServiceServer; +this.idGenerator = idGenerator; +this.clientSource = clientSource; + } + + /** Creates a new, active {@link RemoteEnvironment} backed by a local Docker container. */ + @Override + public RemoteEnvironment createEnvironment(Environment environment) throws Exception { +String workerId = idGenerator.get(); + +// Prepare docker invocation. +Path workerPersistentDirectory = Files.createTempDirectory("worker_persistent_directory"); +Path semiPersistentDirectory = Files.createTempDirectory("semi_persistent_dir"); +String containerImage = environment.getUrl(); +// TODO: https://issues.apache.org/jira/browse/BEAM-4148 The default service address will not +// work for Docker for Mac
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94298&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94298 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 23/Apr/18 21:02 Start Date: 23/Apr/18 21:02 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r183536751 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/manager/SdkHarnessManager.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.manager; + +import org.apache.beam.model.fnexecution.v1.ProvisionApi; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; + +/** + * A manager of resources related to the SDK Harness, capable of providing RemoteBundles to runner + * operators. + * + * In order to provide a simple interface to runner operators, the SdkHarnessManager is + * responsible for owning and managing the lifetimes of resources such as RPC servers and remote + * environments. It is responsible for both instantiation and cleanup of these resources. Since all + * managed resources are owned by the SdkHarnessManager, it is responsible for cleaning them up when + * its close function is called. + */ +public interface SdkHarnessManager extends AutoCloseable { + /** + * Register a {@link ProvisionApi.ProvisionInfo} for a job. + */ + void registerJobInfo(ProvisionApi.ProvisionInfo jobInfo); Review comment: `StaticGrpcProvisionService` only requires `ProvisionInfo` as input, so I'm not entirely sure what service would consume this argument if I passed it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94298) Time Spent: 19h (was: 18h 50m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 19h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94243&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94243 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 23/Apr/18 18:52 Start Date: 23/Apr/18 18:52 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5152: [BEAM-3327] Harness Manager Interfaces URL: https://github.com/apache/beam/pull/5152#discussion_r183501181 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/manager/SdkHarnessManager.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.manager; + +import org.apache.beam.model.fnexecution.v1.ProvisionApi; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; + +/** + * A manager of resources related to the SDK Harness, capable of providing RemoteBundles to runner + * operators. + * + * In order to provide a simple interface to runner operators, the SdkHarnessManager is + * responsible for owning and managing the lifetimes of resources such as RPC servers and remote + * environments. It is responsible for both instantiation and cleanup of these resources. Since all + * managed resources are owned by the SdkHarnessManager, it is responsible for cleaning them up when + * its close function is called. + */ +public interface SdkHarnessManager extends AutoCloseable { + /** + * Register a {@link ProvisionApi.ProvisionInfo} for a job. + */ + void registerJobInfo(ProvisionApi.ProvisionInfo jobInfo); + + + /** + * Register an {@link ArtifactSource} as available for a job. + * @param jobId ID for the job the {@link ArtifactSource} belongs to. + * @param artifactSource The {@link ArtifactSource} to register. + * @return A closeable handle notifying the manager that the {@link ArtifactSource} is no longer + * available. + */ + AutoCloseable registerArtifactSource(String jobId, ArtifactSource artifactSource); Review comment: The choice to do it this was was to anticipate runners for which ArtifactSources, ProvisioningInfo, and StateRequestHandlers had lifetimes that were not necessarily the same as the current flink runner. This was largely in response to your previous point: > some runners will have persistent ArtifactSources and Provisioning information, at minimum, and should not need to respecify a server-shared one every time. We could easily pack it all back up as just an argument, but the question remains: do we want to anticipate runners that need to decouple ArtifactSource availability from the lifetime of an operation. ProvisioningInfo is a little different, since it is a data container, and does not facilitate communication with the worker in the same way that ArtifactSource or StateRequestHandlers do. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94243) Time Spent: 18h 50m (was: 18h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 18h 50m > Remaining Estimate: 0h > > This permits remote stage ex
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94239&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94239 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 23/Apr/18 18:31 Start Date: 23/Apr/18 18:31 Worklog Time Spent: 10m Work Description: bsidhom commented on issue #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#issuecomment-383676757 Cleaned up history. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94239) Time Spent: 18h 40m (was: 18.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 18h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94205&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94205 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 23/Apr/18 17:51 Start Date: 23/Apr/18 17:51 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183481818 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; + +/** + * A {@link RemoteEnvironment} that wraps a running Docker container. + * + * A {@link DockerContainerEnvironment} owns both the underlying docker container that it + * communicates with an the {@link InstructionRequestHandler} that it uses to do so. + * + * Accessors are thread-compatible. Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94205) Time Spent: 18.5h (was: 18h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 18.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=94200&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94200 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 23/Apr/18 17:38 Start Date: 23/Apr/18 17:38 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183470809 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; + +/** + * A {@link RemoteEnvironment} that wraps a running Docker container. + * + * A {@link DockerContainerEnvironment} owns both the underlying docker container that it + * communicates with an the {@link InstructionRequestHandler} that it uses to do so. + * + * Accessors are thread-compatible. Review comment: Hold on, then. If close isn't thread-safe but should block until completion, doesn't it have to just be synchronized regardless? Especially because I wouldn't really consider that to be a performance-critical piece of code. If so please do This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 94200) Time Spent: 18h 20m (was: 18h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 18h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93545&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93545 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 02:09 Start Date: 21/Apr/18 02:09 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183197106 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. Review comment: Actually, I feel like that's a bit long. I'll let you guys sit on it over the weekend and see if we have consensus on Monday. ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93545) Time Spent: 18h 10m (was: 18h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 18h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93543&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93543 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 02:08 Start Date: 21/Apr/18 02:08 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183197087 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. Review comment: Synchronous/blocking is implied by the ControlClientPool contract (i.e., that calls to the source block). I'll go with `SingleUsePerWorkerClientControlClientPool`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93543) Time Spent: 18h (was: 17h 50m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 18h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93542&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93542 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 02:06 Start Date: 21/Apr/18 02:06 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183197040 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. + */ +public class MapControlClientPool implements ControlClientPool { + + /** Creates a {@link MapControlClientPool} with an unspecified timeout. */ + public static MapControlClientPool create() { +return withTimeout(Duration.ofSeconds(Long.MAX_VALUE)); + } + + /** + * Creates a {@link MapControlClientPool} with the given timeout. Timeouts only apply to source + * requests. + */ + public static MapControlClientPool withTimeout(Duration timeout) { +return new MapControlClientPool(timeout); + } + + private final Duration timeout; + private final Object lock = new Object(); + + @GuardedBy("lock") + private final Map> clients = + Maps.newHashMap(); + + private MapControlClientPool(Duration timeout) { +this.timeout = timeout; + } + + @Override + public Source getSource() { +return this::getClient; + } + + @Override + public Sink getSink() { +return this::putClient; + } + + private void putClient(String workerId, InstructionRequestHandler client) { +synchronized (lock) { + CompletableFuture future = + clients.computeIfAbsent(workerId, MapControlClientPool::createClientFuture); + boolean success = future.complete(client); + if (!success) { +throw new IllegalStateException( +String.format("Control client for worker id %s failed to compete", workerId)); + } +} + } + + private InstructionRequestHandler getClient(String workerId) + throws ExecutionException, InterruptedException, TimeoutException { +CompletableFuture future; +synchronized (lock) { + future = clients.computeIfAbsent(workerId, MapControlClientPool::createClientFuture); Review comment: Done. This entire ordeal is making me very sad... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93542) Time Spent: 17h 50m (was: 17h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 17h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93537&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93537 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 01:49 Start Date: 21/Apr/18 01:49 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183196619 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; + +/** + * A {@link RemoteEnvironment} that wraps a running Docker container. + * + * A {@link DockerContainerEnvironment} owns both the underlying docker container that it + * communicates with an the {@link InstructionRequestHandler} that it uses to do so. + * + * Accessors are thread-compatible. Review comment: That depends. `close()` implementations usually block until completion. For example, we (will) depend on this behavior when executing ExecutableStages on runners in order to await completion and surface exceptions during bundle processing. Using an `AtomicBoolean` this way means that some calls may return before the container has been killed. Whether that's considered "safe" depends on how it's used, but I'm hesitant to go that far. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93537) Time Spent: 17h 40m (was: 17.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 17h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93525&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93525 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 01:18 Start Date: 21/Apr/18 01:18 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183195355 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. + */ +public class MapControlClientPool implements ControlClientPool { + + /** Creates a {@link MapControlClientPool} with an unspecified timeout. */ + public static MapControlClientPool create() { +return withTimeout(Duration.ofSeconds(Long.MAX_VALUE)); + } + + /** + * Creates a {@link MapControlClientPool} with the given timeout. Timeouts only apply to source + * requests. + */ + public static MapControlClientPool withTimeout(Duration timeout) { +return new MapControlClientPool(timeout); + } + + private final Duration timeout; + private final Object lock = new Object(); + + @GuardedBy("lock") + private final Map> clients = + Maps.newHashMap(); + + private MapControlClientPool(Duration timeout) { +this.timeout = timeout; + } + + @Override + public Source getSource() { +return this::getClient; + } + + @Override + public Sink getSink() { +return this::putClient; + } + + private void putClient(String workerId, InstructionRequestHandler client) { +synchronized (lock) { + CompletableFuture future = + clients.computeIfAbsent(workerId, MapControlClientPool::createClientFuture); + boolean success = future.complete(client); + if (!success) { +throw new IllegalStateException( +String.format("Control client for worker id %s failed to compete", workerId)); + } +} + } + + private InstructionRequestHandler getClient(String workerId) + throws ExecutionException, InterruptedException, TimeoutException { +CompletableFuture future; +synchronized (lock) { + future = clients.computeIfAbsent(workerId, MapControlClientPool::createClientFuture); Review comment: Hmm. I'm going to be very disappointed if we ever get worker IDs that are "", and although I hate conflating `null` and `""`, I might suggest doing so for the time being, and documenting that that code block should be removed once everyone populates worker IDs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93525) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 17h 20m > Remaining Estimate: 0h > > This permits remote
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93524&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93524 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 01:18 Start Date: 21/Apr/18 01:18 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183195578 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; + +/** + * A {@link RemoteEnvironment} that wraps a running Docker container. + * + * A {@link DockerContainerEnvironment} owns both the underlying docker container that it + * communicates with an the {@link InstructionRequestHandler} that it uses to do so. + * + * Accessors are thread-compatible. Review comment: Actually, if you gate `close()` with an `AtomicBoolean`, this is just thread-safe, isn't it? Returned objects may not be, but that doesn't seem like it's unreasonable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93524) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 17h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93523&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93523 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 01:18 Start Date: 21/Apr/18 01:18 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183195443 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. Review comment: Something with `Paired` in the name? or `Synchronous`, like the queue? (The specific relation between the source and sink needs to be related, but as I said I'm going to be surprised if a runner harness ever asks for an SDK harness by ID explicitly, and it probably should never even see this interface directly regardless - it's too low level) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93523) Time Spent: 17h 20m (was: 17h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 17h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93526&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93526 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 01:18 Start Date: 21/Apr/18 01:18 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183195717 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. Review comment: (SynchronousQueue is really the wrong name for the interface people use SynchronousQueue for, even if it can be used as a BlockingQueue) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93526) Time Spent: 17.5h (was: 17h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 17.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93509&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93509 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 00:45 Start Date: 21/Apr/18 00:45 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183194343 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. Review comment: How about `SingleUseClientControlClientPool` or `SingleUsePerWorkerClientControlClientPool`. Map is an implementation detail which can be omitted by the name in favor of functional detail ( singe use per client ) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93509) Time Spent: 17h 10m (was: 17h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 17h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93505&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93505 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 00:36 Start Date: 21/Apr/18 00:36 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183191479 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. + */ +public class MapControlClientPool implements ControlClientPool { + + /** Creates a {@link MapControlClientPool} with an unspecified timeout. */ + public static MapControlClientPool create() { +return withTimeout(Duration.ofSeconds(Long.MAX_VALUE)); + } + + /** + * Creates a {@link MapControlClientPool} with the given timeout. Timeouts only apply to source + * requests. + */ + public static MapControlClientPool withTimeout(Duration timeout) { +return new MapControlClientPool(timeout); + } + + private final Duration timeout; + private final Object lock = new Object(); + + @GuardedBy("lock") + private final Map> clients = + Maps.newHashMap(); + + private MapControlClientPool(Duration timeout) { +this.timeout = timeout; + } + + @Override + public Source getSource() { +return this::getClient; + } + + @Override + public Sink getSink() { +return this::putClient; + } + + private void putClient(String workerId, InstructionRequestHandler client) { +synchronized (lock) { + CompletableFuture future = + clients.computeIfAbsent(workerId, MapControlClientPool::createClientFuture); + boolean success = future.complete(client); + if (!success) { +throw new IllegalStateException( +String.format("Control client for worker id %s failed to compete", workerId)); + } +} + } + + private InstructionRequestHandler getClient(String workerId) + throws ExecutionException, InterruptedException, TimeoutException { +CompletableFuture future; +synchronized (lock) { + future = clients.computeIfAbsent(workerId, MapControlClientPool::createClientFuture); Review comment: Good point. I had previously expected to do more "transaction" management of operations, but having settled on single-use worker ids, all of these problems went away. I should have noticed that when Eugene mentioned the possible torn critical section. ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93505) Time Spent: 16h 50m (was: 16h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 16h 50m > Remaining E
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93502&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93502 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 00:36 Start Date: 21/Apr/18 00:36 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183192781 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. + */ +public class MapControlClientPool implements ControlClientPool { + + /** Creates a {@link MapControlClientPool} with an unspecified timeout. */ + public static MapControlClientPool create() { +return withTimeout(Duration.ofSeconds(Long.MAX_VALUE)); + } + + /** + * Creates a {@link MapControlClientPool} with the given timeout. Timeouts only apply to source + * requests. + */ + public static MapControlClientPool withTimeout(Duration timeout) { +return new MapControlClientPool(timeout); + } + + private final Duration timeout; + private final Object lock = new Object(); + + @GuardedBy("lock") + private final Map> clients = + Maps.newHashMap(); + + private MapControlClientPool(Duration timeout) { +this.timeout = timeout; + } + + @Override + public Source getSource() { +return this::getClient; + } + + @Override + public Sink getSink() { +return this::putClient; + } + + private void putClient(String workerId, InstructionRequestHandler client) { +synchronized (lock) { + CompletableFuture future = + clients.computeIfAbsent(workerId, MapControlClientPool::createClientFuture); + boolean success = future.complete(client); + if (!success) { +throw new IllegalStateException( +String.format("Control client for worker id %s failed to compete", workerId)); + } +} + } + + private InstructionRequestHandler getClient(String workerId) + throws ExecutionException, InterruptedException, TimeoutException { +CompletableFuture future; +synchronized (lock) { + future = clients.computeIfAbsent(workerId, MapControlClientPool::createClientFuture); Review comment: @angoenka @tgroh OK, I had to revert it in the end because `ConcurrentHashMap` is null-hostile. The nulls are coming from Java harnesses that do not specify a worker-id. I've created https://issues.apache.org/jira/browse/BEAM-4149 to track this, but this is too large of a change to fix here. I don't think it's worth using Optional keys here just to get around this, but let me know what you think. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93502) Time Spent: 16h 40m (was: 16.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93501&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93501 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 00:36 Start Date: 21/Apr/18 00:36 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183193747 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerWrapper.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.ImmutableList; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** A docker command wrapper. Simplifies communications with the Docker daemon. */ +class DockerWrapper { Review comment: Done. (`DockerCommand` without the `s`). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93501) Time Spent: 16h 40m (was: 16.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 16h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93506&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93506 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 00:36 Start Date: 21/Apr/18 00:36 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183190992 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java ## @@ -78,8 +83,16 @@ public static FnApiControlClientPoolService offeringClientsToPool( // service is closed, in which case the client will be discarded when the service is // discarded, which should be performed by a call to #shutdownNow. The remote caller must be // able to handle an unexpectedly terminated connection. - vendedClients.add(newClient); - clientPool.accept(newClient); + synchronized (lock) { +checkState( +!closed, "%s already closed", FnApiControlClientPoolService.class.getSimpleName()); +// TODO: https://issues.apache.org/jira/browse/BEAM-4151: Prevent stale client references +// from leaking. +vendedClients.add(newClient); + } + // NOTE: The client sink must provide its own thread safety. We do not attempt to Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93506) Time Spent: 16h 50m (was: 16h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 16h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93503&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93503 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 00:36 Start Date: 21/Apr/18 00:36 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183191808 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; + +/** + * A {@link RemoteEnvironment} that wraps a running Docker container. + * + * A {@link DockerContainerEnvironment} owns both the underlying docker container that it + * communicates with an the {@link InstructionRequestHandler} that it uses to do so. + * + * Accessors are thread-compatible. Review comment: "Thread-compatible" in this case means that methods that do not mutate the object do not require external synchronization. From my read that means "conditionally thread-safe", but it's only worth mentioning if `InstructionRequestHandler` itself is thread-safe and we expect clients to share these (which I believe to be true). I'm not sure how to word this though. Do I just say "conditionally thread-safe" and assume it's understood that the thread-safe methods are the non-mutating ones? This is implied by "thread-compatible", but I'm not sure how widely used that is. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93503) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 16h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93500&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93500 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 00:36 Start Date: 21/Apr/18 00:36 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183191282 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. Review comment: Hmm... I like the idea of including the single-use-per-worker in the name, but I think that `SingleUseMapControlClientPool` gives the idea that the pool itself can only be used once or something like that. Other suggestions? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93500) Time Spent: 16h 40m (was: 16.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 16h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93504&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93504 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 00:36 Start Date: 21/Apr/18 00:36 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183191383 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. + */ +public class MapControlClientPool implements ControlClientPool { + + /** Creates a {@link MapControlClientPool} with an unspecified timeout. */ + public static MapControlClientPool create() { +return withTimeout(Duration.ofSeconds(Long.MAX_VALUE)); + } + + /** + * Creates a {@link MapControlClientPool} with the given timeout. Timeouts only apply to source + * requests. + */ + public static MapControlClientPool withTimeout(Duration timeout) { +return new MapControlClientPool(timeout); + } + + private final Duration timeout; + private final Object lock = new Object(); + + @GuardedBy("lock") + private final Map> clients = Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93504) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 16h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93507&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93507 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 21/Apr/18 00:36 Start Date: 21/Apr/18 00:36 Worklog Time Spent: 10m Work Description: bsidhom commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183193526 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; +import org.apache.beam.runners.fnexecution.control.ControlClientSource; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService; + +/** + * An {@link EnvironmentFactory} that creates docker containers by shelling out to docker. Returned + * {@link RemoteEnvironment RemoteEnvironments} own their respective docker containers. Not + * thread-safe. + */ +public class DockerEnvironmentFactory implements EnvironmentFactory { + + public static DockerEnvironmentFactory forServices( + DockerWrapper docker, + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + ControlClientSource clientSource, + // TODO: Refine this to IdGenerator when we determine where that should live. + Supplier idGenerator) { +return new DockerEnvironmentFactory( +docker, +controlServiceServer, +loggingServiceServer, +retrievalServiceServer, +provisioningServiceServer, +idGenerator, +clientSource); + } + + private final DockerWrapper docker; + private final GrpcFnServer controlServiceServer; + private final GrpcFnServer loggingServiceServer; + private final GrpcFnServer retrievalServiceServer; + private final GrpcFnServer provisioningServiceServer; + private final Supplier idGenerator; + private final ControlClientSource clientSource; + + private DockerEnvironmentFactory( + DockerWrapper docker, + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + Supplier idGenerator, + ControlClientSource clientSource) { +this.docker = docker; +this.controlServiceServer = controlServiceServer; +this.loggingServiceServer = loggingServiceServer; +this.retrievalServiceServer = retrievalServiceServer; +this.provisioningServiceServer = provisioningServiceServer; +this.idGenerator = idGenerator; +this.clientSource = clientSource; + } + + /** Creates a new, active {@link RemoteEnvironment} backed by a local Docker container. */ + @Override + public RemoteEnvironment createEnvironment(Environment environment) throws Exception { +String workerId = idGenerator.get(); + +// Prepare docker invocation. +Path workerPersistentDirectory = Files.createTempDirectory("worker_persistent_directory"); +Path semiPersistentDirectory = Files.createTempDirectory("semi_persistent_dir"); +String containerImage = environment.getUrl(); +// TODO: https://issues.apache.org/jira/browse/BEAM-4148 The default service address will not +// work for Docker for Mac
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93490&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93490 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 20/Apr/18 23:29 Start Date: 20/Apr/18 23:29 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183187816 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. + */ +public class MapControlClientPool implements ControlClientPool { + + /** Creates a {@link MapControlClientPool} with an unspecified timeout. */ + public static MapControlClientPool create() { +return withTimeout(Duration.ofSeconds(Long.MAX_VALUE)); + } + + /** + * Creates a {@link MapControlClientPool} with the given timeout. Timeouts only apply to source + * requests. + */ + public static MapControlClientPool withTimeout(Duration timeout) { +return new MapControlClientPool(timeout); + } + + private final Duration timeout; + private final Object lock = new Object(); + + @GuardedBy("lock") + private final Map> clients = + Maps.newHashMap(); + + private MapControlClientPool(Duration timeout) { +this.timeout = timeout; + } + + @Override + public Source getSource() { +return this::getClient; + } + + @Override + public Sink getSink() { +return this::putClient; + } + + private void putClient(String workerId, InstructionRequestHandler client) { +synchronized (lock) { + CompletableFuture future = + clients.computeIfAbsent(workerId, MapControlClientPool::createClientFuture); + boolean success = future.complete(client); + if (!success) { +throw new IllegalStateException( +String.format("Control client for worker id %s failed to compete", workerId)); + } +} + } + + private InstructionRequestHandler getClient(String workerId) + throws ExecutionException, InterruptedException, TimeoutException { +CompletableFuture future; +synchronized (lock) { + future = clients.computeIfAbsent(workerId, MapControlClientPool::createClientFuture); Review comment: If this is a concurrent map, it's a thread-safe operation, and you shouldn't need to lock access to the map. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93490) Time Spent: 16h (was: 15h 50m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 16h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93492&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93492 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 20/Apr/18 23:29 Start Date: 20/Apr/18 23:29 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183188262 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerContainerEnvironment.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; + +/** + * A {@link RemoteEnvironment} that wraps a running Docker container. + * + * A {@link DockerContainerEnvironment} owns both the underlying docker container that it + * communicates with an the {@link InstructionRequestHandler} that it uses to do so. + * + * Accessors are thread-compatible. Review comment: Maybe worth referencing what this means. From my read of Effective Java point 70, this would just be "not thread-safe" in that book's breakdown. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93492) Time Spent: 16h 20m (was: 16h 10m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 16h 20m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93493&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93493 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 20/Apr/18 23:29 Start Date: 20/Apr/18 23:29 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183187927 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. + */ +public class MapControlClientPool implements ControlClientPool { + + /** Creates a {@link MapControlClientPool} with an unspecified timeout. */ + public static MapControlClientPool create() { +return withTimeout(Duration.ofSeconds(Long.MAX_VALUE)); + } + + /** + * Creates a {@link MapControlClientPool} with the given timeout. Timeouts only apply to source + * requests. + */ + public static MapControlClientPool withTimeout(Duration timeout) { +return new MapControlClientPool(timeout); + } + + private final Duration timeout; + private final Object lock = new Object(); + + @GuardedBy("lock") + private final Map> clients = + Maps.newHashMap(); + + private MapControlClientPool(Duration timeout) { +this.timeout = timeout; + } + + @Override + public Source getSource() { +return this::getClient; + } + + @Override + public Sink getSink() { +return this::putClient; + } + + private void putClient(String workerId, InstructionRequestHandler client) { +synchronized (lock) { + CompletableFuture future = + clients.computeIfAbsent(workerId, MapControlClientPool::createClientFuture); + boolean success = future.complete(client); + if (!success) { +throw new IllegalStateException( +String.format("Control client for worker id %s failed to compete", workerId)); + } +} + } + + private InstructionRequestHandler getClient(String workerId) + throws ExecutionException, InterruptedException, TimeoutException { +CompletableFuture future; +synchronized (lock) { + future = clients.computeIfAbsent(workerId, MapControlClientPool::createClientFuture); Review comment: e.g. everyone who gets the future calls `ConcurrentMap.computeIfAbsent(id, createClientFuture)`, and just uses the returned value every time. Removes all of the associated synchronization. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93493) Time Spent: 16.5h (was: 16h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 16.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environ
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93494&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93494 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 20/Apr/18 23:29 Start Date: 20/Apr/18 23:29 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183188417 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; +import org.apache.beam.runners.fnexecution.control.ControlClientSource; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService; + +/** + * An {@link EnvironmentFactory} that creates docker containers by shelling out to docker. Returned + * {@link RemoteEnvironment RemoteEnvironments} own their respective docker containers. Not + * thread-safe. + */ +public class DockerEnvironmentFactory implements EnvironmentFactory { + + public static DockerEnvironmentFactory forServices( + DockerWrapper docker, + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + ControlClientSource clientSource, + // TODO: Refine this to IdGenerator when we determine where that should live. + Supplier idGenerator) { +return new DockerEnvironmentFactory( +docker, +controlServiceServer, +loggingServiceServer, +retrievalServiceServer, +provisioningServiceServer, +idGenerator, +clientSource); + } + + private final DockerWrapper docker; + private final GrpcFnServer controlServiceServer; + private final GrpcFnServer loggingServiceServer; + private final GrpcFnServer retrievalServiceServer; + private final GrpcFnServer provisioningServiceServer; + private final Supplier idGenerator; + private final ControlClientSource clientSource; + + private DockerEnvironmentFactory( + DockerWrapper docker, + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + Supplier idGenerator, + ControlClientSource clientSource) { +this.docker = docker; +this.controlServiceServer = controlServiceServer; +this.loggingServiceServer = loggingServiceServer; +this.retrievalServiceServer = retrievalServiceServer; +this.provisioningServiceServer = provisioningServiceServer; +this.idGenerator = idGenerator; +this.clientSource = clientSource; + } + + /** Creates a new, active {@link RemoteEnvironment} backed by a local Docker container. */ + @Override + public RemoteEnvironment createEnvironment(Environment environment) throws Exception { +String workerId = idGenerator.get(); + +// Prepare docker invocation. +Path workerPersistentDirectory = Files.createTempDirectory("worker_persistent_directory"); +Path semiPersistentDirectory = Files.createTempDirectory("semi_persistent_dir"); +String containerImage = environment.getUrl(); +// TODO: https://issues.apache.org/jira/browse/BEAM-4148 The default service address will not +// work for Docker for Mac.
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93491&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93491 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 20/Apr/18 23:29 Start Date: 20/Apr/18 23:29 Worklog Time Spent: 10m Work Description: tgroh commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r18319 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerWrapper.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.ImmutableList; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** A docker command wrapper. Simplifies communications with the Docker daemon. */ +class DockerWrapper { Review comment: `DockerCommands`? `Wrapper` makes me think that there's some other java-y interface that we're using to talk to Docker. If you have strong feelings, well, I don't, so do what you think is best. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93491) Time Spent: 16h 10m (was: 16h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 16h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93487&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93487 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 20/Apr/18 23:25 Start Date: 20/Apr/18 23:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183188207 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. Review comment: Should we rename `MapControlClientPool => SingleUseMapControlClientPool ` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93487) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 15h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93488&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93488 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 20/Apr/18 23:25 Start Date: 20/Apr/18 23:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183187409 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/MapControlClientPool.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.Maps; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link ControlClientPool} backed by a client map. It is expected that a given client id will be + * requested at most once. + */ +public class MapControlClientPool implements ControlClientPool { + + /** Creates a {@link MapControlClientPool} with an unspecified timeout. */ + public static MapControlClientPool create() { +return withTimeout(Duration.ofSeconds(Long.MAX_VALUE)); + } + + /** + * Creates a {@link MapControlClientPool} with the given timeout. Timeouts only apply to source + * requests. + */ + public static MapControlClientPool withTimeout(Duration timeout) { +return new MapControlClientPool(timeout); + } + + private final Duration timeout; + private final Object lock = new Object(); + + @GuardedBy("lock") + private final Map> clients = Review comment: Can we use ConcurrentHashMap and avoid using low level synchronization construct. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93488) Time Spent: 15h 50m (was: 15h 40m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 15h 50m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93486&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93486 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 20/Apr/18 23:25 Start Date: 20/Apr/18 23:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183186307 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java ## @@ -17,45 +17,50 @@ */ package org.apache.beam.runners.fnexecution.control; +import static com.google.common.base.Preconditions.checkState; + import io.grpc.stub.StreamObserver; +import java.util.ArrayList; import java.util.Collection; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.concurrent.GuardedBy; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc; import org.apache.beam.runners.fnexecution.FnService; import org.apache.beam.runners.fnexecution.HeaderAccessor; -import org.apache.beam.sdk.fn.function.ThrowingConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** A Fn API control service which adds incoming SDK harness connections to a pool. */ +/** A Fn API control service which adds incoming SDK harness connections to a sink. */ public class FnApiControlClientPoolService extends BeamFnControlGrpc.BeamFnControlImplBase implements FnService { private static final Logger LOGGER = LoggerFactory.getLogger(FnApiControlClientPoolService.class); - private final ThrowingConsumer clientPool; - private final Collection vendedClients = new CopyOnWriteArrayList<>(); + private final Object lock = new Object(); + private final ControlClientPool.Sink clientSink; private final HeaderAccessor headerAccessor; - private AtomicBoolean closed = new AtomicBoolean(); + + @GuardedBy("lock") Review comment: Nice! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93486) Time Spent: 15h 40m (was: 15.5h) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 15h 40m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=93485&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-93485 ] ASF GitHub Bot logged work on BEAM-3327: Author: ASF GitHub Bot Created on: 20/Apr/18 23:25 Start Date: 20/Apr/18 23:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5189: [BEAM-3327] Basic Docker environment factory URL: https://github.com/apache/beam/pull/5189#discussion_r183186120 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java ## @@ -78,8 +83,16 @@ public static FnApiControlClientPoolService offeringClientsToPool( // service is closed, in which case the client will be discarded when the service is // discarded, which should be performed by a call to #shutdownNow. The remote caller must be // able to handle an unexpectedly terminated connection. - vendedClients.add(newClient); - clientPool.accept(newClient); + synchronized (lock) { +checkState( +!closed, "%s already closed", FnApiControlClientPoolService.class.getSimpleName()); +// TODO: https://issues.apache.org/jira/browse/BEAM-4151: Prevent stale client references +// from leaking. +vendedClients.add(newClient); + } + // NOTE: The client sink must provide its own thread safety. We do not attempt to Review comment: Nit: We can move this note to ControlClientPool for easier reference. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 93485) Time Spent: 15.5h (was: 15h 20m) > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Ben Sidhom >Priority: Major > Labels: portability > Time Spent: 15.5h > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)