[ https://issues.apache.org/jira/browse/BEAM-13399?focusedWorklogId=719783&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-719783 ]
ASF GitHub Bot logged work on BEAM-13399: ----------------------------------------- Author: ASF GitHub Bot Created on: 03/Feb/22 00:21 Start Date: 03/Feb/22 00:21 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #16671: URL: https://github.com/apache/beam/pull/16671#discussion_r798121059 ########## File path: sdks/go/pkg/beam/transforms/sql/sql.go ########## @@ -114,6 +116,9 @@ func Transform(s beam.Scope, query string, opts ...Option) beam.PCollection { if o.expansionAddr != "" { expansionAddr = xlangx.Require(o.expansionAddr) } + if expansionAddr == sqlx.DefaultExpansionAddr { + expansionAddr = xlangx.UseAutomatedExpansionService(serviceGradleTarget) Review comment: Agreed that it's awkward, but sqlx is to keep things out of the user space package. We can do a once over on the documentation in the subsequent cleanups. ########## File path: sdks/go/pkg/beam/core/runtime/xlangx/expand.go ########## @@ -140,3 +142,67 @@ func QueryExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.Expans } return res, nil } + +func startAutomatedExpansionService(gradleTarget string) (func() error, string, error) { + jarPath, err := expansionx.GetBeamJar(gradleTarget, core.SdkVersion) + if err != nil { + return nil, "", err + } + serviceRunner, err := expansionx.NewExpansionServiceRunner(jarPath, "") + if err != nil { + return nil, "", err + } + err = serviceRunner.StartService() + if err != nil { + return nil, "", err + } + return serviceRunner.StopService, serviceRunner.Endpoint(), nil +} + +// QueryAutomatedExpansionService submits an external transform to be expanded by the +// expansion service and then eagerly materializes the artifacts for staging. The given +// transform should be the external transform, and the components are any additional +// components necessary for the pipeline snippet. +// +// The address to be queried is determined by the Config field of the HandlerParams after +// the prefix tag indicating the automated service is in use. +func QueryAutomatedExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.ExpansionResponse, error) { + // Strip auto: tag to get Gradle target + tag, target := parseAddr(p.Config) + + stopFunc, address, err := startAutomatedExpansionService(target) + if err != nil { + return nil, err + } + defer stopFunc() + + p.Config = address + + res, err := QueryExpansionService(ctx, p) + if err != nil { + return nil, err + } + + exp := &graph.ExpandedTransform{ + Components: res.GetComponents(), + Transform: res.GetTransform(), + Requirements: res.GetRequirements(), + } + + p.ext.Expanded = exp + // Put correct expansion address into edge + p.edge.External.ExpansionAddr = address Review comment: A subsequent PR can probably just add a flag to the graph.External node in graph, that we can check to prevent the later materialization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 719783) Time Spent: 23h 10m (was: 23h) > [Go SDK] Add automated expansion service start up feature > --------------------------------------------------------- > > Key: BEAM-13399 > URL: https://issues.apache.org/jira/browse/BEAM-13399 > Project: Beam > Issue Type: Improvement > Components: cross-language, sdk-go > Reporter: Jack McCluskey > Assignee: Jack McCluskey > Priority: P2 > Time Spent: 23h 10m > Remaining Estimate: 0h > > Add Go SDK support for automated expansion service start-up if one is not > provided, following the example of the Python implementation. -- This message was sent by Atlassian Jira (v8.20.1#820001)