[jira] [Created] (BEAM-5464) Portable beam hangs while running TFX preprocessing step on a distributed cluster
Axel Magnuson created BEAM-5464: --- Summary: Portable beam hangs while running TFX preprocessing step on a distributed cluster Key: BEAM-5464 URL: https://issues.apache.org/jira/browse/BEAM-5464 Project: Beam Issue Type: Bug Components: java-fn-execution Reporter: Axel Magnuson Assignee: Ankur Goenka Recently I went through the exercise of running the TFX taxi example on a dataproc cluster. However it would always hang indefinitely. The flink UI indicated that the job was halfway done. However I could not see any clear errors in the job driver logs, the job service logs, or the Flink logs. The root cause is still a mystery to me. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-5463) Artifact staging service cannot write to HDFS
Axel Magnuson created BEAM-5463: --- Summary: Artifact staging service cannot write to HDFS Key: BEAM-5463 URL: https://issues.apache.org/jira/browse/BEAM-5463 Project: Beam Issue Type: Bug Components: java-fn-execution Reporter: Axel Magnuson Assignee: Ankur Goenka With `--artifacts-dir=hdfs://tmp/beam-artifacts` set on the job service, Python wordcount fails. The artifact staging service produces the following error: java.lang.IllegalArgumentException: No filesystem found for scheme hdfs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-5454) Improve Beam Fn API Artifact Staging Service Error Handling
Axel Magnuson created BEAM-5454: --- Summary: Improve Beam Fn API Artifact Staging Service Error Handling Key: BEAM-5454 URL: https://issues.apache.org/jira/browse/BEAM-5454 Project: Beam Issue Type: Improvement Components: java-fn-execution Reporter: Axel Magnuson Assignee: Axel Magnuson I noticed that the onComplete logic is not wrapped in a try/catch block. This led to an uncaught NPE in a problem I was debugging. It also does not log errors, preferring to just use the observer's callback to pass these errors to the SDK. However some SDKs such as golang's do not surface these errors, and they get lost. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (BEAM-3673) FlinkRunner: Harness manager for connecting operators to SDK Harnesses
[ https://issues.apache.org/jira/browse/BEAM-3673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Axel Magnuson closed BEAM-3673. --- Resolution: Fixed Fix Version/s: 2.6.0 This was fixed a long time ago but never got closed. I'm going through and reviewing all my open issues. > FlinkRunner: Harness manager for connecting operators to SDK Harnesses > -- > > Key: BEAM-3673 > URL: https://issues.apache.org/jira/browse/BEAM-3673 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Axel Magnuson >Priority: Major > Fix For: 2.6.0 > > > SDK harnesses require a common set of gRPC services to operate. The role of > the harness manager is to provide a uniform interface that multiplexes data > streams and auxiliary data between SDK environments and operators within a > given job. > Note that multiple operators may communicate with a single SDK environment to > amortize container initialization cost. Environments are _not_ shared between > different jobs. > The initial implementation will shell out to local docker, but the harness > manager should eventually support working with externally-managed > environments (e.g., created by Kubernetes). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3672) FlinkRunner: Implement an Artifact service using the Flink DistributedCache
[ https://issues.apache.org/jira/browse/BEAM-3672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16588320#comment-16588320 ] Axel Magnuson commented on BEAM-3672: - Deprioritizing and unassigning this since there exists a viable ArtifactService that uses GCS, and it is an old work item that I'm not sure is worthwhile. > FlinkRunner: Implement an Artifact service using the Flink DistributedCache > --- > > Key: BEAM-3672 > URL: https://issues.apache.org/jira/browse/BEAM-3672 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Axel Magnuson >Priority: Minor > > We need to have a DistributedCache-based artifact service to ship with the > portable Flink runner. The DistributedCache is a perfect fit for Flink > because it comes for free and is the mechanism that Flink already uses to > distribute its own artifacts. > > The final artifact service implementation should be pluggable, but using the > DistributedCache allows the Flink runner to work without additional external > dependencies (beyond perhaps Docker). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-3672) FlinkRunner: Implement an Artifact service using the Flink DistributedCache
[ https://issues.apache.org/jira/browse/BEAM-3672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Axel Magnuson reassigned BEAM-3672: --- Assignee: Aljoscha Krettek (was: Axel Magnuson) > FlinkRunner: Implement an Artifact service using the Flink DistributedCache > --- > > Key: BEAM-3672 > URL: https://issues.apache.org/jira/browse/BEAM-3672 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Aljoscha Krettek >Priority: Minor > > We need to have a DistributedCache-based artifact service to ship with the > portable Flink runner. The DistributedCache is a perfect fit for Flink > because it comes for free and is the mechanism that Flink already uses to > distribute its own artifacts. > > The final artifact service implementation should be pluggable, but using the > DistributedCache allows the Flink runner to work without additional external > dependencies (beyond perhaps Docker). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-3672) FlinkRunner: Implement an Artifact service using the Flink DistributedCache
[ https://issues.apache.org/jira/browse/BEAM-3672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Axel Magnuson updated BEAM-3672: Priority: Minor (was: Major) > FlinkRunner: Implement an Artifact service using the Flink DistributedCache > --- > > Key: BEAM-3672 > URL: https://issues.apache.org/jira/browse/BEAM-3672 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Axel Magnuson >Priority: Minor > > We need to have a DistributedCache-based artifact service to ship with the > portable Flink runner. The DistributedCache is a perfect fit for Flink > because it comes for free and is the mechanism that Flink already uses to > distribute its own artifacts. > > The final artifact service implementation should be pluggable, but using the > DistributedCache allows the Flink runner to work without additional external > dependencies (beyond perhaps Docker). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4519) Artifact Retrieval Service Protocol should be able to serve multiple Manifests.
Axel Magnuson created BEAM-4519: --- Summary: Artifact Retrieval Service Protocol should be able to serve multiple Manifests. Key: BEAM-4519 URL: https://issues.apache.org/jira/browse/BEAM-4519 Project: Beam Issue Type: Bug Components: runner-core Reporter: Axel Magnuson Assignee: Axel Magnuson The artifact staging service currently returns a staging_token that can be used as a key to access a manifest. However, the current protocol does not have a field that accepts this token. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4491) DockerJobBundleFactory's ProvisionInfo is empty.
Axel Magnuson created BEAM-4491: --- Summary: DockerJobBundleFactory's ProvisionInfo is empty. Key: BEAM-4491 URL: https://issues.apache.org/jira/browse/BEAM-4491 Project: Beam Issue Type: Bug Components: runner-core Reporter: Axel Magnuson Assignee: Axel Magnuson DockerJobBundleFactory creates a ProvisioningService that serves the default empty ProvisionInfo, rather than the ProvisionInfo related to a job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4267) Implement a reusable library that can run an ExecutableStage with a given Environment
Axel Magnuson created BEAM-4267: --- Summary: Implement a reusable library that can run an ExecutableStage with a given Environment Key: BEAM-4267 URL: https://issues.apache.org/jira/browse/BEAM-4267 Project: Beam Issue Type: Improvement Components: runner-flink Reporter: Axel Magnuson Assignee: Axel Magnuson Build off of the interfaces introduced in [BEAM-3327|https://github.com/apache/beam/pull/5152] to provide a reusable execution library to runners. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16466495#comment-16466495 ] Axel Magnuson edited comment on BEAM-3327 at 5/7/18 9:32 PM: - Fixed by: [https://github.com/apache/beam/pull/5152] This PR only adds interfaces, not implementations. I think that given the amount of work that will be sunk into these interfaces, we should start one or more separate jira tickets for implementation work. was (Author: axelmagn): Fixed by: [[BEAM-3327] Harness Manager Interfaces|[http://example.com](https://github.com/apache/beam/pull/5152)|http://example.com]%28https//github.com/apache/beam/pull/5152)] This PR only adds interfaces, not implementations. I think that given the amount of work that will be sunk into these interfaces, we should start one or more separate jira tickets for implementation work. > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Fix For: Not applicable > > Time Spent: 29h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.
[ https://issues.apache.org/jira/browse/BEAM-3327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Axel Magnuson resolved BEAM-3327. - Resolution: Fixed Fix Version/s: Not applicable Fixed by: [[BEAM-3327] Harness Manager Interfaces|[http://example.com](https://github.com/apache/beam/pull/5152)|http://example.com]%28https//github.com/apache/beam/pull/5152)] This PR only adds interfaces, not implementations. I think that given the amount of work that will be sunk into these interfaces, we should start one or more separate jira tickets for implementation work. > Add abstractions to manage Environment Instance lifecycles. > --- > > Key: BEAM-3327 > URL: https://issues.apache.org/jira/browse/BEAM-3327 > Project: Beam > Issue Type: New Feature > Components: runner-core >Reporter: Thomas Groh >Assignee: Axel Magnuson >Priority: Major > Labels: portability > Fix For: Not applicable > > Time Spent: 29h 10m > Remaining Estimate: 0h > > This permits remote stage execution for arbitrary environments -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-4095) Add abstractions for runners to provide artifacts to ArtifactRetrievalService
Axel Magnuson created BEAM-4095: --- Summary: Add abstractions for runners to provide artifacts to ArtifactRetrievalService Key: BEAM-4095 URL: https://issues.apache.org/jira/browse/BEAM-4095 Project: Beam Issue Type: Improvement Components: runner-core Reporter: Axel Magnuson Assignee: Axel Magnuson In the case of runners on cluster engines, the responsibility of storing and propagating artifacts can be left up to the runner. In order for the runner to make artifacts available to the ArtifactRetrievalService, abstractions are necessary to provide these artifacts to the retrieval service. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3977) Member classes of SdkHarnessClient should have their own files.
Axel Magnuson created BEAM-3977: --- Summary: Member classes of SdkHarnessClient should have their own files. Key: BEAM-3977 URL: https://issues.apache.org/jira/browse/BEAM-3977 Project: Beam Issue Type: Improvement Components: runner-core Reporter: Axel Magnuson Assignee: Axel Magnuson SdkHarnessClient contains quite a few nested classes that could be split out. of these, BundleProcessor and ActiveBundle have grown up to be first class concepts that we interact with just as much as the SdkHarnessClient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-3976) SdkHarnessClient is thread-safe
[ https://issues.apache.org/jira/browse/BEAM-3976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Axel Magnuson resolved BEAM-3976. - Resolution: Not A Problem Fix Version/s: Not applicable > SdkHarnessClient is thread-safe > --- > > Key: BEAM-3976 > URL: https://issues.apache.org/jira/browse/BEAM-3976 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Ben Sidhom >Assignee: Axel Magnuson >Priority: Major > Fix For: Not applicable > > > In general, we want to share access to a given SDK harness among multiple > runner workers as a way to amortize container startup and resource costs. > Because control messages are multiplexed over the same shared control > connection, this sharing currently requires external locking on the same > shared lock object. This is error-prone and difficult to verify. The > SdkHarnesClient should use internal mechanisms to provide thread-safety. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3976) SdkHarnessClient is thread-safe
[ https://issues.apache.org/jira/browse/BEAM-3976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16420943#comment-16420943 ] Axel Magnuson commented on BEAM-3976: - After investigating and discussing with Thomas Groh, this has already been fixed. the SdkHarnessClient should be threadsafe. See line 57 in: [https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java] > SdkHarnessClient is thread-safe > --- > > Key: BEAM-3976 > URL: https://issues.apache.org/jira/browse/BEAM-3976 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Ben Sidhom >Assignee: Axel Magnuson >Priority: Major > > In general, we want to share access to a given SDK harness among multiple > runner workers as a way to amortize container startup and resource costs. > Because control messages are multiplexed over the same shared control > connection, this sharing currently requires external locking on the same > shared lock object. This is error-prone and difficult to verify. The > SdkHarnesClient should use internal mechanisms to provide thread-safety. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3882) Make StateRequestHandler::accept interface idiomatic.
Axel Magnuson created BEAM-3882: --- Summary: Make StateRequestHandler::accept interface idiomatic. Key: BEAM-3882 URL: https://issues.apache.org/jira/browse/BEAM-3882 Project: Beam Issue Type: Improvement Components: runner-core Reporter: Axel Magnuson Assignee: Axel Magnuson StateRequestHandler was based on some Dataflow SDK source code that does not conform to Beam coding conventions. In particular, the accept method takes its return value as a parameter, which is a code smell. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-3676) FlinkRunner: Portable state service
[ https://issues.apache.org/jira/browse/BEAM-3676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Axel Magnuson reassigned BEAM-3676: --- Assignee: Axel Magnuson (was: Aljoscha Krettek) > FlinkRunner: Portable state service > --- > > Key: BEAM-3676 > URL: https://issues.apache.org/jira/browse/BEAM-3676 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Axel Magnuson >Priority: Major > > The State API is an implementation of BeamFnState that exposes pipeline state > to SDK harnesses. Because it is used for side inputs, this service will also > need to be tied into side inputs/outputs during the translation phase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-3805) Python ULR Harness submits null pipeline_options field in PrepareJobRequest
Axel Magnuson created BEAM-3805: --- Summary: Python ULR Harness submits null pipeline_options field in PrepareJobRequest Key: BEAM-3805 URL: https://issues.apache.org/jira/browse/BEAM-3805 Project: Beam Issue Type: Bug Components: sdk-py-harness Reporter: Axel Magnuson Assignee: Robert Bradshaw the beam_job_api gRPC definition indicates that pipeline_options is a required field. However, the python ULR currently does not set an options struct. This is trivially solved on the job server side by instantiating a default. However, there may be cases where the job server assumes it exists and falls over on an NPE during translation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-3672) FlinkRunner: Implement an Artifact service using the Flink DistributedCache
[ https://issues.apache.org/jira/browse/BEAM-3672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386828#comment-16386828 ] Axel Magnuson commented on BEAM-3672: - Although this is mostly implemented, the current artifact service handles strings naively and would likely fail when receiving strings that represent illegal input for file paths. We plan on replacing this path resolution with a filepath-safe encoding such as base64, but this is a low priority item that has not been tackled yet. Known affected files are: # /runners/local-artifact-service-java/src/main/java/* # /runners/flink/src/main/java/org/apache/beam/runners/flink/execution/FlinkArtifactSource.java # /runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkCachedArtifactPaths.java > FlinkRunner: Implement an Artifact service using the Flink DistributedCache > --- > > Key: BEAM-3672 > URL: https://issues.apache.org/jira/browse/BEAM-3672 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Aljoscha Krettek >Priority: Major > > We need to have a DistributedCache-based artifact service to ship with the > portable Flink runner. The DistributedCache is a perfect fit for Flink > because it comes for free and is the mechanism that Flink already uses to > distribute its own artifacts. > > The final artifact service implementation should be pluggable, but using the > DistributedCache allows the Flink runner to work without additional external > dependencies (beyond perhaps Docker). -- This message was sent by Atlassian JIRA (v7.6.3#76005)