[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=189144=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-189144 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 23/Jan/19 19:08 Start Date: 23/Jan/19 19:08 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 189144) Time Spent: 5h 50m (was: 5h 40m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=189143=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-189143 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 23/Jan/19 19:08 Start Date: 23/Jan/19 19:08 Worklog Time Spent: 10m Work Description: xinyuiscool commented on issue #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#issuecomment-456927471 I am going to merge the changes in since we don't want to hold this for long. Feel free to add comments anyway and we will address them in future patches. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 189143) Time Spent: 5h 40m (was: 5.5h) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=188647=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188647 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 23/Jan/19 04:13 Start Date: 23/Jan/19 04:13 Worklog Time Spent: 10m Work Description: lhaiesp commented on issue #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#issuecomment-456663737 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 188647) Time Spent: 5.5h (was: 5h 20m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=187132=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-187132 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 23:01 Start Date: 18/Jan/19 23:01 Worklog Time Spent: 10m Work Description: lhaiesp commented on issue #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#issuecomment-455715619 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 187132) Time Spent: 5h 20m (was: 5h 10m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=187038=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-187038 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 21:14 Start Date: 18/Jan/19 21:14 Worklog Time Spent: 10m Work Description: lhaiesp commented on issue #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#issuecomment-455689548 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 187038) Time Spent: 5h 10m (was: 5h) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=187012=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-187012 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 19:41 Start Date: 18/Jan/19 19:41 Worklog Time Spent: 10m Work Description: lhaiesp commented on issue #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#issuecomment-455664380 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 187012) Time Spent: 5h (was: 4h 50m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=187011=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-187011 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 19:39 Start Date: 18/Jan/19 19:39 Worklog Time Spent: 10m Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321 initial support for portable api in samza runner, including job server inside samza runner, config translation for portable pipeline, transform translation for portable pipeline, refactor of existing codes to merge logic of portable api and java api as much as possible, etc. The change doesn't yet include effort to support - batching the events transferred between java runner and sdk workers (bundling) - multiple sdk workers connecting to one java runner Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=187009=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-187009 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 19:37 Start Date: 18/Jan/19 19:37 Worklog Time Spent: 10m Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 187009) Time Spent: 4h 40m (was: 4.5h) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=187004=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-187004 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 19:12 Start Date: 18/Jan/19 19:12 Worklog Time Spent: 10m Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r249155184 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java ## @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) { this.options = options; } + private void closeFnServer(GrpcFnServer fnServer) { +try (AutoCloseable closer = fnServer) { + // do nothing +} catch (Exception e) { + LOG.error("Failed to close fn api servers. Ignore since this is shutdown process...", e); +} + } + + private void setUpContextManager( + StreamGraph streamGraph, SamzaExecutionContext executionContext) { +streamGraph.withContextManager( +new ContextManager() { + @Override + public void init(Config config, TaskContext context) { +if (executionContext.getMetricsContainer() == null) { + final MetricsRegistryMap metricsRegistry = + (MetricsRegistryMap) context.getSamzaContainerContext().metricsRegistry; + executionContext.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry)); +} + +if (SamzaRunnerOverrideConfigs.isPortableMode(options)) { + if (jobBundleFactory == null) { Review comment: Hmm, this would tie to the work to test "multiple sdk workers connecting to one java runner" as well as replacing "SingleEnvironmentInstanceJobBundleFactory". I think it's better to group those effort together This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 187004) Time Spent: 4.5h (was: 4h 20m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186984=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186984 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 18:11 Start Date: 18/Jan/19 18:11 Worklog Time Spent: 10m Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r249136978 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java ## @@ -66,21 +68,48 @@ public void translate( } if (inputStreams.size() == 1) { - ctx.registerMessageStream(output, inputStreams.get(0)); + ctx.registerMessageStream(output, Iterables.getOnlyElement(inputStreams)); return; } +ctx.registerMessageStream(output, mergeInputStreams(inputStreams)); + } + + @Override + public void translatePortable( + PipelineNode.PTransformNode transform, + QueryablePipeline pipeline, + PortableTranslationContext ctx) { +final List>> inputStreams = ctx.getAllInputMessageStreams(transform); +final String outputId = ctx.getOutputId(transform); + +if (inputStreams.isEmpty()) { + // For portable api there should be at least the impulse as a dummy input + // We will know once validateRunner tests are available for portable runners + throw new IllegalArgumentException( + String.format("no input streams defined for Flatten: %s", transform.getId())); +} + +if (inputStreams.size() == 1) { Review comment: good point This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 186984) Time Spent: 4h 20m (was: 4h 10m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186983=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186983 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 18:07 Start Date: 18/Jan/19 18:07 Worklog Time Spent: 10m Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r249135944 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java ## @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) { this.options = options; } + private void closeFnServer(GrpcFnServer fnServer) { +try (AutoCloseable closer = fnServer) { + // do nothing +} catch (Exception e) { + LOG.error("Failed to close fn api servers. Ignore since this is shutdown process...", e); +} + } + + private void setUpContextManager( + StreamGraph streamGraph, SamzaExecutionContext executionContext) { +streamGraph.withContextManager( +new ContextManager() { + @Override + public void init(Config config, TaskContext context) { +if (executionContext.getMetricsContainer() == null) { + final MetricsRegistryMap metricsRegistry = + (MetricsRegistryMap) context.getSamzaContainerContext().metricsRegistry; + executionContext.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry)); +} + +if (SamzaRunnerOverrideConfigs.isPortableMode(options)) { + if (jobBundleFactory == null) { +try { + final long waitTimeoutMs = + SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options); + final InstructionRequestHandler instructionHandler = + controlClientPool + .getSource() + .take(SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs)); + final EnvironmentFactory environmentFactory = + environment -> RemoteEnvironment.forHandler(environment, instructionHandler); + // TODO: use JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping + jobBundleFactory = + SingleEnvironmentInstanceJobBundleFactory.create( + environmentFactory, fnDataServer, fnStateServer); +} catch (Exception e) { + throw new RuntimeException( + "Running samza in Beam portable mode but failed to create job bundle factory", + e); +} +executionContext.setJobBundleFactory(jobBundleFactory); + } +} + +context.setUserContext(executionContext); + } + + @Override + public void close() { +closeFnServer(fnControlServer); +fnControlServer = null; +closeFnServer(fnDataServer); +fnDataServer = null; +closeFnServer(fnStateServer); +fnStateServer = null; + } +}); + } + + private void setUpFnApiServer() { +controlClientPool = MapControlClientPool.create(); +ExecutorService dataExecutor = Executors.newCachedThreadPool(); +try { + fnControlServer = + GrpcFnServer.allocatePortAndCreateFor( + FnApiControlClientPoolService.offeringClientsToPool( + controlClientPool.getSink(), () -> SAMZA_WORKER_ID), + ServerFactory.createWithPortSupplier( + () -> SamzaRunnerOverrideConfigs.getFnControlPort(options))); + + fnDataServer = + GrpcFnServer.allocatePortAndCreateFor( + GrpcDataService.create(dataExecutor, OutboundObserverFactory.serverDirect()), + ServerFactory.createDefault()); + + fnStateServer = + GrpcFnServer.allocatePortAndCreateFor( + GrpcStateService.create(), ServerFactory.createDefault()); +} catch (Exception e) { + LOG.error("Failed to set up fn api servers", e); + throw new RuntimeException(e); +} + } + + SamzaPipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) { +final SamzaExecutionContext executionContext = new SamzaExecutionContext(); +setUpFnApiServer(); Review comment: As discussed, put a TODO here as it's already done in 1.0 migrate This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186669=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186669 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 02:14 Start Date: 18/Jan/19 02:14 Worklog Time Spent: 10m Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r248908054 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java ## @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) { this.options = options; } + private void closeFnServer(GrpcFnServer fnServer) { +try (AutoCloseable closer = fnServer) { + // do nothing +} catch (Exception e) { + LOG.error("Failed to close fn api servers. Ignore since this is shutdown process...", e); +} + } + + private void setUpContextManager( + StreamGraph streamGraph, SamzaExecutionContext executionContext) { +streamGraph.withContextManager( +new ContextManager() { + @Override + public void init(Config config, TaskContext context) { +if (executionContext.getMetricsContainer() == null) { + final MetricsRegistryMap metricsRegistry = + (MetricsRegistryMap) context.getSamzaContainerContext().metricsRegistry; + executionContext.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry)); +} + +if (SamzaRunnerOverrideConfigs.isPortableMode(options)) { + if (jobBundleFactory == null) { +try { + final long waitTimeoutMs = + SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options); + final InstructionRequestHandler instructionHandler = + controlClientPool + .getSource() + .take(SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs)); + final EnvironmentFactory environmentFactory = + environment -> RemoteEnvironment.forHandler(environment, instructionHandler); + // TODO: use JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping + jobBundleFactory = + SingleEnvironmentInstanceJobBundleFactory.create( + environmentFactory, fnDataServer, fnStateServer); +} catch (Exception e) { + throw new RuntimeException( + "Running samza in Beam portable mode but failed to create job bundle factory", + e); +} +executionContext.setJobBundleFactory(jobBundleFactory); + } +} + +context.setUserContext(executionContext); + } + + @Override + public void close() { +closeFnServer(fnControlServer); +fnControlServer = null; +closeFnServer(fnDataServer); +fnDataServer = null; +closeFnServer(fnStateServer); +fnStateServer = null; + } +}); + } + + private void setUpFnApiServer() { +controlClientPool = MapControlClientPool.create(); Review comment: the pool doesn't have a way to close. So setting the variable to null is the best we could do. Do need to shutdown the executorService though. Good point This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 186669) Time Spent: 4h (was: 3h 50m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186663=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186663 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 02:04 Start Date: 18/Jan/19 02:04 Worklog Time Spent: 10m Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r248906805 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.ServerFactory; +import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService; +import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Driver program that starts a job server. */ +public class SamzaJobServerDriver { + private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class); + + private final ServerConfiguration config; + + /** Configuration for the jobServer. */ + private static class ServerConfiguration { +@Option(name = "--job-port", usage = "The job service port. (Default: 11440)") +private int jobPort = 11440; + +@Option(name = "--control-port", usage = "The FnControl port. (Default: 11441)") +private int controlPort = 11441; + } + + private SamzaJobServerDriver(ServerConfiguration config) { +this.config = config; + } + + public static void main(String[] args) throws Exception { +ServerConfiguration configuration = new ServerConfiguration(); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 186663) Time Spent: 3.5h (was: 3h 20m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186665=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186665 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 02:08 Start Date: 18/Jan/19 02:08 Worklog Time Spent: 10m Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r248907312 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.ServerFactory; +import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService; +import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Driver program that starts a job server. */ +public class SamzaJobServerDriver { + private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class); + + private final ServerConfiguration config; + + /** Configuration for the jobServer. */ + private static class ServerConfiguration { +@Option(name = "--job-port", usage = "The job service port. (Default: 11440)") +private int jobPort = 11440; + +@Option(name = "--control-port", usage = "The FnControl port. (Default: 11441)") +private int controlPort = 11441; + } + + private SamzaJobServerDriver(ServerConfiguration config) { +this.config = config; + } + + public static void main(String[] args) throws Exception { +ServerConfiguration configuration = new ServerConfiguration(); +CmdLineParser parser = new CmdLineParser(configuration); +try { + parser.parseArgument(args); + fromConfig(configuration).run(); +} catch (CmdLineException e) { + LOG.error("Unable to parse command line arguments {}", Arrays.asList(args), e); + throw new IllegalArgumentException("Unable to parse command line arguments.", e); +} catch (Exception e) { + LOG.error("Hit exception with SamzaJobServer. Exiting...", e); + throw e; +} + } + + public static SamzaJobServerDriver fromConfig(ServerConfiguration config) { +return new SamzaJobServerDriver(config); + } + + private InMemoryJobService createJobService() throws IOException { +JobInvoker jobInvoker = +new JobInvoker() { + @Override + public JobInvocation invoke( + RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken) + throws IOException { +SamzaPipelineOptions samzaPipelineOptions = + PipelineOptionsTranslation.fromProto(options).as(SamzaPipelineOptions.class); +Map overrideConfig = +samzaPipelineOptions.getConfigOverride() != null +? samzaPipelineOptions.getConfigOverride() +: new HashMap<>(); +overrideConfig.put(SamzaRunnerOverrideConfigs.IS_PORTABLE_MODE, String.valueOf(true)); +overrideConfig.put( +SamzaRunnerOverrideConfigs.FN_CONTROL_PORT, String.valueOf(config.controlPort)); +samzaPipelineOptions.setConfigOverride(overrideConfig); +return new
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186664=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186664 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 02:08 Start Date: 18/Jan/19 02:08 Worklog Time Spent: 10m Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r248907238 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.ServerFactory; +import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService; +import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Driver program that starts a job server. */ +public class SamzaJobServerDriver { + private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class); + + private final ServerConfiguration config; + + /** Configuration for the jobServer. */ + private static class ServerConfiguration { +@Option(name = "--job-port", usage = "The job service port. (Default: 11440)") +private int jobPort = 11440; + +@Option(name = "--control-port", usage = "The FnControl port. (Default: 11441)") +private int controlPort = 11441; + } + + private SamzaJobServerDriver(ServerConfiguration config) { +this.config = config; + } + + public static void main(String[] args) throws Exception { +ServerConfiguration configuration = new ServerConfiguration(); +CmdLineParser parser = new CmdLineParser(configuration); +try { + parser.parseArgument(args); + fromConfig(configuration).run(); +} catch (CmdLineException e) { + LOG.error("Unable to parse command line arguments {}", Arrays.asList(args), e); + throw new IllegalArgumentException("Unable to parse command line arguments.", e); +} catch (Exception e) { + LOG.error("Hit exception with SamzaJobServer. Exiting...", e); + throw e; +} + } + + public static SamzaJobServerDriver fromConfig(ServerConfiguration config) { +return new SamzaJobServerDriver(config); + } + + private InMemoryJobService createJobService() throws IOException { Review comment: sure. need to move the config to be a parameter. either way doesn't make too much difference This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 186664) Time Spent: 3h 40m (was: 3.5h) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major >
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186662=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186662 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 02:04 Start Date: 18/Jan/19 02:04 Worklog Time Spent: 10m Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r248906683 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED; + +import java.util.function.Consumer; +import org.apache.beam.model.jobmanagement.v1.JobApi; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Invocation of a Samza job via {@link SamzaRunner}. */ +public class SamzaJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(SamzaJobInvocation.class); + + private final SamzaPipelineOptions options; + private final RunnerApi.Pipeline originalPipeline; + private volatile SamzaPipelineResult pipelineResult; + + public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions options) { +this.originalPipeline = pipeline; +this.options = options; + } + + private SamzaPipelineResult invokeSamzaJob() { +// Fused pipeline proto. +final RunnerApi.Pipeline fusedPipeline = +GreedyPipelineFuser.fuse(originalPipeline).toPipeline(); +LOG.info("Portable pipeline to run:"); +LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline)); +// the pipeline option coming from sdk will set the sdk specific runner which will break serialization +// so we need to reset the runner here to a valid Java runner +options.setRunner(SamzaRunner.class); +try { + final SamzaRunner runner = new SamzaRunner(options); + return runner.runPortablePipeline(fusedPipeline); +} catch (Exception e) { + throw new RuntimeException("Failed to invoke samza job", e); +} + } + + @Override + public void start() { +LOG.info("Starting job invocation {}", getId()); +pipelineResult = invokeSamzaJob(); + } + + @Override + public String getId() { +return options.getJobName(); + } + + @Override + public void cancel() { +try { + if (pipelineResult != null) { +pipelineResult.cancel(); + } +} catch (Exception e) { + throw new RuntimeException("Failed to cancel job.", e); +} + } + + @Override + public JobApi.JobState.Enum getState() { +if (pipelineResult == null) { + return STARTING; +} +switch (pipelineResult.getState()) { Review comment: because one is the Java PipelineResult.State, the other one is the protobuf version of JobApi.JobState. That's very unfortunate. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186661=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186661 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 02:04 Start Date: 18/Jan/19 02:04 Worklog Time Spent: 10m Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r248906683 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED; + +import java.util.function.Consumer; +import org.apache.beam.model.jobmanagement.v1.JobApi; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Invocation of a Samza job via {@link SamzaRunner}. */ +public class SamzaJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(SamzaJobInvocation.class); + + private final SamzaPipelineOptions options; + private final RunnerApi.Pipeline originalPipeline; + private volatile SamzaPipelineResult pipelineResult; + + public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions options) { +this.originalPipeline = pipeline; +this.options = options; + } + + private SamzaPipelineResult invokeSamzaJob() { +// Fused pipeline proto. +final RunnerApi.Pipeline fusedPipeline = +GreedyPipelineFuser.fuse(originalPipeline).toPipeline(); +LOG.info("Portable pipeline to run:"); +LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline)); +// the pipeline option coming from sdk will set the sdk specific runner which will break serialization +// so we need to reset the runner here to a valid Java runner +options.setRunner(SamzaRunner.class); +try { + final SamzaRunner runner = new SamzaRunner(options); + return runner.runPortablePipeline(fusedPipeline); +} catch (Exception e) { + throw new RuntimeException("Failed to invoke samza job", e); +} + } + + @Override + public void start() { +LOG.info("Starting job invocation {}", getId()); +pipelineResult = invokeSamzaJob(); + } + + @Override + public String getId() { +return options.getJobName(); + } + + @Override + public void cancel() { +try { + if (pipelineResult != null) { +pipelineResult.cancel(); + } +} catch (Exception e) { + throw new RuntimeException("Failed to cancel job.", e); +} + } + + @Override + public JobApi.JobState.Enum getState() { +if (pipelineResult == null) { + return STARTING; +} +switch (pipelineResult.getState()) { Review comment: because one if the Java PipelineResult.State, the other one is the protobuf version of JobApi.JobState. That's very unfortunate. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186659=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186659 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 18/Jan/19 01:57 Start Date: 18/Jan/19 01:57 Worklog Time Spent: 10m Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r248905652 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED; + +import java.util.function.Consumer; +import org.apache.beam.model.jobmanagement.v1.JobApi; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Invocation of a Samza job via {@link SamzaRunner}. */ +public class SamzaJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(SamzaJobInvocation.class); + + private final SamzaPipelineOptions options; + private final RunnerApi.Pipeline originalPipeline; + private volatile SamzaPipelineResult pipelineResult; + + public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions options) { +this.originalPipeline = pipeline; +this.options = options; + } + + private SamzaPipelineResult invokeSamzaJob() { +// Fused pipeline proto. +final RunnerApi.Pipeline fusedPipeline = +GreedyPipelineFuser.fuse(originalPipeline).toPipeline(); +LOG.info("Portable pipeline to run:"); +LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline)); +// the pipeline option coming from sdk will set the sdk specific runner which will break serialization +// so we need to reset the runner here to a valid Java runner +options.setRunner(SamzaRunner.class); +try { + final SamzaRunner runner = new SamzaRunner(options); + return runner.runPortablePipeline(fusedPipeline); +} catch (Exception e) { + throw new RuntimeException("Failed to invoke samza job", e); +} + } + + @Override + public void start() { +LOG.info("Starting job invocation {}", getId()); +pipelineResult = invokeSamzaJob(); + } + + @Override + public String getId() { Review comment: Looks like it's only used for logging. But you're right, the semantic does require the id to be unique across different invocation. Flink uses job name + random UUID. I'm considering using the same pattern here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 186659) Time Spent: 3h (was: 2h 50m) > initial support for portable api in samza runner > > > Key:
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180454=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180454 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 02/Jan/19 23:22 Start Date: 02/Jan/19 23:22 Worklog Time Spent: 10m Work Description: xinyuiscool commented on issue #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#issuecomment-451016084 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 180454) Time Spent: 2h 50m (was: 2h 40m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180453=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180453 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 02/Jan/19 23:21 Start Date: 02/Jan/19 23:21 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r244877673 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java ## @@ -66,21 +68,48 @@ public void translate( } if (inputStreams.size() == 1) { - ctx.registerMessageStream(output, inputStreams.get(0)); + ctx.registerMessageStream(output, Iterables.getOnlyElement(inputStreams)); return; } +ctx.registerMessageStream(output, mergeInputStreams(inputStreams)); + } + + @Override + public void translatePortable( + PipelineNode.PTransformNode transform, + QueryablePipeline pipeline, + PortableTranslationContext ctx) { +final List>> inputStreams = ctx.getAllInputMessageStreams(transform); +final String outputId = ctx.getOutputId(transform); + +if (inputStreams.isEmpty()) { Review comment: checkState(!inputStreams.isEmpty(), ".") This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 180453) Time Spent: 2h 40m (was: 2.5h) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180452=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180452 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 02/Jan/19 23:21 Start Date: 02/Jan/19 23:21 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r244882732 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.translation; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.fnexecution.wire.WireCoders; +import org.apache.beam.runners.samza.SamzaPipelineOptions; +import org.apache.beam.runners.samza.runtime.OpMessage; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; + +/** + * Helper that keeps the mapping from BEAM PCollection id to Samza {@link MessageStream}. It also + * provides other context data such as input and output of a {@link + * org.apache.beam.model.pipeline.v1.RunnerApi.PTransform}. + */ +public class PortableTranslationContext { + private final Map> messsageStreams = new HashMap<>(); + private final StreamGraph streamGraph; + private final SamzaPipelineOptions options; + private int topologicalId; + private final Set registeredInputStreams = new HashSet<>(); + + public PortableTranslationContext(StreamGraph streamGraph, SamzaPipelineOptions options) { +this.streamGraph = streamGraph; +this.options = options; + } + + public SamzaPipelineOptions getSamzaPipelineOptions() { +return this.options; + } + + public void setCurrentTopologicalId(int id) { +this.topologicalId = id; + } + + public int getCurrentTopologicalId() { +return this.topologicalId; + } + + public List>> getAllInputMessageStreams( + PipelineNode.PTransformNode transform) { +final Collection inputStreamIds = transform.getTransform().getInputsMap().values(); +return inputStreamIds.stream().map(this::getMessageStreamById).collect(Collectors.toList()); + } + + public MessageStream> getOneInputMessageStream( + PipelineNode.PTransformNode transform) { +String id = Iterables.getOnlyElement(transform.getTransform().getInputsMap().values()); +return getMessageStreamById(id); + } + + @SuppressWarnings("unchecked") + public MessageStream> getMessageStreamById(String id) { +return (MessageStream>) messsageStreams.get(id); + } + + public String getInputId(PipelineNode.PTransformNode transform) { +return Iterables.getOnlyElement(transform.getTransform().getInputsMap().values()); + } + + public String getOutputId(PipelineNode.PTransformNode transform) { +return Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()); + } + + public void registerMessageStream(String id, MessageStream> stream) { +if (messsageStreams.containsKey(id)) { + throw new IllegalArgumentException("Stream already
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180451=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180451 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 02/Jan/19 23:21 Start Date: 02/Jan/19 23:21 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r244877567 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java ## @@ -66,21 +68,48 @@ public void translate( } if (inputStreams.size() == 1) { - ctx.registerMessageStream(output, inputStreams.get(0)); + ctx.registerMessageStream(output, Iterables.getOnlyElement(inputStreams)); return; } +ctx.registerMessageStream(output, mergeInputStreams(inputStreams)); + } + + @Override + public void translatePortable( + PipelineNode.PTransformNode transform, + QueryablePipeline pipeline, + PortableTranslationContext ctx) { +final List>> inputStreams = ctx.getAllInputMessageStreams(transform); +final String outputId = ctx.getOutputId(transform); + +if (inputStreams.isEmpty()) { + // For portable api there should be at least the impulse as a dummy input + // We will know once validateRunner tests are available for portable runners + throw new IllegalArgumentException( + String.format("no input streams defined for Flatten: %s", transform.getId())); +} + +if (inputStreams.size() == 1) { Review comment: This optimization should be folded inside mergeInputStreams() This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 180451) Time Spent: 2h 20m (was: 2h 10m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180330=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180330 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 02/Jan/19 17:51 Start Date: 02/Jan/19 17:51 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r244792611 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED; + +import java.util.function.Consumer; +import org.apache.beam.model.jobmanagement.v1.JobApi; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Invocation of a Samza job via {@link SamzaRunner}. */ +public class SamzaJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(SamzaJobInvocation.class); + + private final SamzaPipelineOptions options; + private final RunnerApi.Pipeline originalPipeline; + private volatile SamzaPipelineResult pipelineResult; + + public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions options) { +this.originalPipeline = pipeline; +this.options = options; + } + + private SamzaPipelineResult invokeSamzaJob() { +// Fused pipeline proto. +final RunnerApi.Pipeline fusedPipeline = +GreedyPipelineFuser.fuse(originalPipeline).toPipeline(); +LOG.info("Portable pipeline to run:"); +LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline)); +// the pipeline option coming from sdk will set the sdk specific runner which will break serialization +// so we need to reset the runner here to a valid Java runner +options.setRunner(SamzaRunner.class); +try { + final SamzaRunner runner = new SamzaRunner(options); + return runner.runPortablePipeline(fusedPipeline); +} catch (Exception e) { + throw new RuntimeException("Failed to invoke samza job", e); +} + } + + @Override + public void start() { +LOG.info("Starting job invocation {}", getId()); +pipelineResult = invokeSamzaJob(); + } + + @Override + public String getId() { Review comment: I think we might need to concatenate jobId with jobName here, since Samza jobs can have multiple instances. Also good to check where getId() is being used. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 180330) Time Spent: 1.5h (was: 1h 20m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL:
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180331=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180331 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 02/Jan/19 17:51 Start Date: 02/Jan/19 17:51 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r244793072 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED; +import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED; + +import java.util.function.Consumer; +import org.apache.beam.model.jobmanagement.v1.JobApi; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Invocation of a Samza job via {@link SamzaRunner}. */ +public class SamzaJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(SamzaJobInvocation.class); + + private final SamzaPipelineOptions options; + private final RunnerApi.Pipeline originalPipeline; + private volatile SamzaPipelineResult pipelineResult; + + public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions options) { +this.originalPipeline = pipeline; +this.options = options; + } + + private SamzaPipelineResult invokeSamzaJob() { +// Fused pipeline proto. +final RunnerApi.Pipeline fusedPipeline = +GreedyPipelineFuser.fuse(originalPipeline).toPipeline(); +LOG.info("Portable pipeline to run:"); +LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline)); +// the pipeline option coming from sdk will set the sdk specific runner which will break serialization +// so we need to reset the runner here to a valid Java runner +options.setRunner(SamzaRunner.class); +try { + final SamzaRunner runner = new SamzaRunner(options); + return runner.runPortablePipeline(fusedPipeline); +} catch (Exception e) { + throw new RuntimeException("Failed to invoke samza job", e); +} + } + + @Override + public void start() { +LOG.info("Starting job invocation {}", getId()); +pipelineResult = invokeSamzaJob(); + } + + @Override + public String getId() { +return options.getJobName(); + } + + @Override + public void cancel() { +try { + if (pipelineResult != null) { +pipelineResult.cancel(); + } +} catch (Exception e) { + throw new RuntimeException("Failed to cancel job.", e); +} + } + + @Override + public JobApi.JobState.Enum getState() { +if (pipelineResult == null) { + return STARTING; +} +switch (pipelineResult.getState()) { Review comment: Seems we can directly return pipelineResult here. Any idea why doing this switch? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180337=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180337 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 02/Jan/19 17:51 Start Date: 02/Jan/19 17:51 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r244794298 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.ServerFactory; +import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService; +import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Driver program that starts a job server. */ +public class SamzaJobServerDriver { + private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class); + + private final ServerConfiguration config; + + /** Configuration for the jobServer. */ + private static class ServerConfiguration { +@Option(name = "--job-port", usage = "The job service port. (Default: 11440)") +private int jobPort = 11440; + +@Option(name = "--control-port", usage = "The FnControl port. (Default: 11441)") +private int controlPort = 11441; + } + + private SamzaJobServerDriver(ServerConfiguration config) { +this.config = config; + } + + public static void main(String[] args) throws Exception { +ServerConfiguration configuration = new ServerConfiguration(); +CmdLineParser parser = new CmdLineParser(configuration); +try { + parser.parseArgument(args); + fromConfig(configuration).run(); +} catch (CmdLineException e) { + LOG.error("Unable to parse command line arguments {}", Arrays.asList(args), e); + throw new IllegalArgumentException("Unable to parse command line arguments.", e); +} catch (Exception e) { + LOG.error("Hit exception with SamzaJobServer. Exiting...", e); + throw e; +} + } + + public static SamzaJobServerDriver fromConfig(ServerConfiguration config) { +return new SamzaJobServerDriver(config); + } + + private InMemoryJobService createJobService() throws IOException { Review comment: static? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 180337) Time Spent: 2h 10m (was: 1h 50m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Support portable api in
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180332=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180332 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 02/Jan/19 17:51 Start Date: 02/Jan/19 17:51 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r244797460 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java ## @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) { this.options = options; } + private void closeFnServer(GrpcFnServer fnServer) { +try (AutoCloseable closer = fnServer) { + // do nothing +} catch (Exception e) { + LOG.error("Failed to close fn api servers. Ignore since this is shutdown process...", e); +} + } + + private void setUpContextManager( + StreamGraph streamGraph, SamzaExecutionContext executionContext) { +streamGraph.withContextManager( +new ContextManager() { + @Override + public void init(Config config, TaskContext context) { +if (executionContext.getMetricsContainer() == null) { + final MetricsRegistryMap metricsRegistry = + (MetricsRegistryMap) context.getSamzaContainerContext().metricsRegistry; + executionContext.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry)); +} + +if (SamzaRunnerOverrideConfigs.isPortableMode(options)) { + if (jobBundleFactory == null) { +try { + final long waitTimeoutMs = + SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options); + final InstructionRequestHandler instructionHandler = + controlClientPool + .getSource() + .take(SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs)); + final EnvironmentFactory environmentFactory = + environment -> RemoteEnvironment.forHandler(environment, instructionHandler); + // TODO: use JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping + jobBundleFactory = + SingleEnvironmentInstanceJobBundleFactory.create( + environmentFactory, fnDataServer, fnStateServer); +} catch (Exception e) { + throw new RuntimeException( + "Running samza in Beam portable mode but failed to create job bundle factory", + e); +} +executionContext.setJobBundleFactory(jobBundleFactory); + } +} + +context.setUserContext(executionContext); + } + + @Override + public void close() { +closeFnServer(fnControlServer); +fnControlServer = null; +closeFnServer(fnDataServer); +fnDataServer = null; +closeFnServer(fnStateServer); +fnStateServer = null; + } +}); + } + + private void setUpFnApiServer() { +controlClientPool = MapControlClientPool.create(); Review comment: DO we need to close this pool as well as the executorService during shutdown? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 180332) Time Spent: 1h 50m (was: 1h 40m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180334=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180334 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 02/Jan/19 17:51 Start Date: 02/Jan/19 17:51 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r244803102 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java ## @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) { this.options = options; } + private void closeFnServer(GrpcFnServer fnServer) { +try (AutoCloseable closer = fnServer) { + // do nothing +} catch (Exception e) { + LOG.error("Failed to close fn api servers. Ignore since this is shutdown process...", e); +} + } + + private void setUpContextManager( + StreamGraph streamGraph, SamzaExecutionContext executionContext) { +streamGraph.withContextManager( +new ContextManager() { + @Override + public void init(Config config, TaskContext context) { +if (executionContext.getMetricsContainer() == null) { + final MetricsRegistryMap metricsRegistry = + (MetricsRegistryMap) context.getSamzaContainerContext().metricsRegistry; + executionContext.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry)); +} + +if (SamzaRunnerOverrideConfigs.isPortableMode(options)) { + if (jobBundleFactory == null) { +try { + final long waitTimeoutMs = + SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options); + final InstructionRequestHandler instructionHandler = + controlClientPool + .getSource() + .take(SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs)); + final EnvironmentFactory environmentFactory = + environment -> RemoteEnvironment.forHandler(environment, instructionHandler); + // TODO: use JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping + jobBundleFactory = + SingleEnvironmentInstanceJobBundleFactory.create( + environmentFactory, fnDataServer, fnStateServer); +} catch (Exception e) { + throw new RuntimeException( + "Running samza in Beam portable mode but failed to create job bundle factory", + e); +} +executionContext.setJobBundleFactory(jobBundleFactory); + } +} + +context.setUserContext(executionContext); + } + + @Override + public void close() { +closeFnServer(fnControlServer); +fnControlServer = null; +closeFnServer(fnDataServer); +fnDataServer = null; +closeFnServer(fnStateServer); +fnStateServer = null; + } +}); + } + + private void setUpFnApiServer() { +controlClientPool = MapControlClientPool.create(); +ExecutorService dataExecutor = Executors.newCachedThreadPool(); +try { + fnControlServer = + GrpcFnServer.allocatePortAndCreateFor( + FnApiControlClientPoolService.offeringClientsToPool( + controlClientPool.getSink(), () -> SAMZA_WORKER_ID), + ServerFactory.createWithPortSupplier( + () -> SamzaRunnerOverrideConfigs.getFnControlPort(options))); + + fnDataServer = + GrpcFnServer.allocatePortAndCreateFor( + GrpcDataService.create(dataExecutor, OutboundObserverFactory.serverDirect()), + ServerFactory.createDefault()); + + fnStateServer = + GrpcFnServer.allocatePortAndCreateFor( + GrpcStateService.create(), ServerFactory.createDefault()); +} catch (Exception e) { + LOG.error("Failed to set up fn api servers", e); + throw new RuntimeException(e); +} + } + + SamzaPipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) { +final SamzaExecutionContext executionContext = new SamzaExecutionContext(); +setUpFnApiServer(); Review comment: setupFnApiServer() needs to be folded inside setupContextManager() since it needs to be set up only when the stream graph is created during runtime. Currently it was done during submission time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180336=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180336 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 02/Jan/19 17:51 Start Date: 02/Jan/19 17:51 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r244795786 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.ServerFactory; +import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService; +import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Driver program that starts a job server. */ +public class SamzaJobServerDriver { + private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class); + + private final ServerConfiguration config; + + /** Configuration for the jobServer. */ + private static class ServerConfiguration { +@Option(name = "--job-port", usage = "The job service port. (Default: 11440)") +private int jobPort = 11440; + +@Option(name = "--control-port", usage = "The FnControl port. (Default: 11441)") +private int controlPort = 11441; + } + + private SamzaJobServerDriver(ServerConfiguration config) { +this.config = config; + } + + public static void main(String[] args) throws Exception { +ServerConfiguration configuration = new ServerConfiguration(); +CmdLineParser parser = new CmdLineParser(configuration); +try { + parser.parseArgument(args); + fromConfig(configuration).run(); +} catch (CmdLineException e) { + LOG.error("Unable to parse command line arguments {}", Arrays.asList(args), e); + throw new IllegalArgumentException("Unable to parse command line arguments.", e); +} catch (Exception e) { + LOG.error("Hit exception with SamzaJobServer. Exiting...", e); + throw e; +} + } + + public static SamzaJobServerDriver fromConfig(ServerConfiguration config) { +return new SamzaJobServerDriver(config); + } + + private InMemoryJobService createJobService() throws IOException { +JobInvoker jobInvoker = +new JobInvoker() { + @Override + public JobInvocation invoke( + RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken) + throws IOException { +SamzaPipelineOptions samzaPipelineOptions = + PipelineOptionsTranslation.fromProto(options).as(SamzaPipelineOptions.class); +Map overrideConfig = +samzaPipelineOptions.getConfigOverride() != null +? samzaPipelineOptions.getConfigOverride() +: new HashMap<>(); +overrideConfig.put(SamzaRunnerOverrideConfigs.IS_PORTABLE_MODE, String.valueOf(true)); +overrideConfig.put( +SamzaRunnerOverrideConfigs.FN_CONTROL_PORT, String.valueOf(config.controlPort)); +samzaPipelineOptions.setConfigOverride(overrideConfig); +return new
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180333=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180333 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 02/Jan/19 17:51 Start Date: 02/Jan/19 17:51 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r244791772 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java ## @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) { this.options = options; } + private void closeFnServer(GrpcFnServer fnServer) { +try (AutoCloseable closer = fnServer) { + // do nothing +} catch (Exception e) { + LOG.error("Failed to close fn api servers. Ignore since this is shutdown process...", e); +} + } + + private void setUpContextManager( + StreamGraph streamGraph, SamzaExecutionContext executionContext) { +streamGraph.withContextManager( +new ContextManager() { + @Override + public void init(Config config, TaskContext context) { +if (executionContext.getMetricsContainer() == null) { + final MetricsRegistryMap metricsRegistry = + (MetricsRegistryMap) context.getSamzaContainerContext().metricsRegistry; + executionContext.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry)); +} + +if (SamzaRunnerOverrideConfigs.isPortableMode(options)) { + if (jobBundleFactory == null) { Review comment: Here we set jobBundleFactory across all Samza tasks, so they will share a single connection to sdk worker process. Shall we also consider the option to have jobBundleFactory for each task? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 180333) Time Spent: 1h 50m (was: 1h 40m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180335=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180335 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 02/Jan/19 17:51 Start Date: 02/Jan/19 17:51 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r244793361 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.ServerFactory; +import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService; +import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Driver program that starts a job server. */ +public class SamzaJobServerDriver { + private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class); + + private final ServerConfiguration config; + + /** Configuration for the jobServer. */ + private static class ServerConfiguration { +@Option(name = "--job-port", usage = "The job service port. (Default: 11440)") +private int jobPort = 11440; + +@Option(name = "--control-port", usage = "The FnControl port. (Default: 11441)") +private int controlPort = 11441; + } + + private SamzaJobServerDriver(ServerConfiguration config) { +this.config = config; + } + + public static void main(String[] args) throws Exception { +ServerConfiguration configuration = new ServerConfiguration(); Review comment: I think this deserves a "final" here, and the line below too. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 180335) Time Spent: 2h (was: 1h 50m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180329=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180329 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 02/Jan/19 17:51 Start Date: 02/Jan/19 17:51 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r244796257 ## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java ## @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) { this.options = options; } + private void closeFnServer(GrpcFnServer fnServer) { Review comment: static? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 180329) Time Spent: 1h 20m (was: 1h 10m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=177530=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177530 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 20/Dec/18 14:44 Start Date: 20/Dec/18 14:44 Worklog Time Spent: 10m Work Description: mxm commented on issue #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#issuecomment-449021622 Thanks for the PR. Great to see another Runner becoming portable. I'll definitely check this out. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 177530) Time Spent: 1h (was: 50m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=177314=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177314 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 20/Dec/18 03:27 Start Date: 20/Dec/18 03:27 Worklog Time Spent: 10m Work Description: xinyuiscool commented on issue #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#issuecomment-448847386 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 177314) Time Spent: 40m (was: 0.5h) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=177315=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177315 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 20/Dec/18 03:27 Start Date: 20/Dec/18 03:27 Worklog Time Spent: 10m Work Description: xinyuiscool commented on issue #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#issuecomment-448847448 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 177315) Time Spent: 50m (was: 40m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=177313=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177313 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 20/Dec/18 03:25 Start Date: 20/Dec/18 03:25 Worklog Time Spent: 10m Work Description: xinyuiscool commented on issue #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#issuecomment-448846782 Run Java PreCommit Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 177313) Time Spent: 0.5h (was: 20m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=177130=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177130 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 19/Dec/18 18:46 Start Date: 19/Dec/18 18:46 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#issuecomment-448702966 Nice! I tagged a couple others who are good choices to take a look at this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 177130) Time Spent: 20m (was: 10m) > initial support for portable api in samza runner > > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza >Reporter: Hai Lu >Assignee: Hai Lu >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner
[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=177085=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177085 ] ASF GitHub Bot logged work on BEAM-6271: Author: ASF GitHub Bot Created on: 19/Dec/18 17:15 Start Date: 19/Dec/18 17:15 Worklog Time Spent: 10m Work Description: lhaiesp opened a new pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321 initial support for portable api in samza runner, including job server inside samza runner, config translation for portable pipeline, transform translation for portable pipeline, refactor of existing codes to merge logic of portable api and java api as much as possible, etc. The change doesn't yet include effort to support - batching the events transferred between java runner and sdk workers (bundling) - multiple sdk workers connecting to one java runner Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact