[ https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180333&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180333 ]
ASF GitHub Bot logged work on BEAM-6271: ---------------------------------------- Author: ASF GitHub Bot Created on: 02/Jan/19 17:51 Start Date: 02/Jan/19 17:51 Worklog Time Spent: 10m Work Description: xinyuiscool commented on pull request #7321: [BEAM-6271] SamzaRunner: initial support for portable api in samza runner URL: https://github.com/apache/beam/pull/7321#discussion_r244791772 ########## File path: runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java ########## @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) { this.options = options; } + private void closeFnServer(GrpcFnServer<?> fnServer) { + try (AutoCloseable closer = fnServer) { + // do nothing + } catch (Exception e) { + LOG.error("Failed to close fn api servers. Ignore since this is shutdown process...", e); + } + } + + private void setUpContextManager( + StreamGraph streamGraph, SamzaExecutionContext executionContext) { + streamGraph.withContextManager( + new ContextManager() { + @Override + public void init(Config config, TaskContext context) { + if (executionContext.getMetricsContainer() == null) { + final MetricsRegistryMap metricsRegistry = + (MetricsRegistryMap) context.getSamzaContainerContext().metricsRegistry; + executionContext.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry)); + } + + if (SamzaRunnerOverrideConfigs.isPortableMode(options)) { + if (jobBundleFactory == null) { Review comment: Here we set jobBundleFactory across all Samza tasks, so they will share a single connection to sdk worker process. Shall we also consider the option to have jobBundleFactory for each task? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 180333) Time Spent: 1h 50m (was: 1h 40m) > initial support for portable api in samza runner > ------------------------------------------------ > > Key: BEAM-6271 > URL: https://issues.apache.org/jira/browse/BEAM-6271 > Project: Beam > Issue Type: Task > Components: runner-samza > Reporter: Hai Lu > Assignee: Hai Lu > Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Support portable api in Samza runner. > This ticket tracks the initial effort to support portable api in Samza > runner, including job server inside samza runner, config translation for > portable pipeline, transform translation for portable pipeline, refactor of > existing codes to merge logic of portable api and java api as much as > possible, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)