[ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180333&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180333
 ]

ASF GitHub Bot logged work on BEAM-6271:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Jan/19 17:51
            Start Date: 02/Jan/19 17:51
    Worklog Time Spent: 10m 
      Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r244791772
 
 

 ##########
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
 ##########
 @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) {
     this.options = options;
   }
 
+  private void closeFnServer(GrpcFnServer<?> fnServer) {
+    try (AutoCloseable closer = fnServer) {
+      // do nothing
+    } catch (Exception e) {
+      LOG.error("Failed to close fn api servers. Ignore since this is shutdown 
process...", e);
+    }
+  }
+
+  private void setUpContextManager(
+      StreamGraph streamGraph, SamzaExecutionContext executionContext) {
+    streamGraph.withContextManager(
+        new ContextManager() {
+          @Override
+          public void init(Config config, TaskContext context) {
+            if (executionContext.getMetricsContainer() == null) {
+              final MetricsRegistryMap metricsRegistry =
+                  (MetricsRegistryMap) 
context.getSamzaContainerContext().metricsRegistry;
+              executionContext.setMetricsContainer(new 
SamzaMetricsContainer(metricsRegistry));
+            }
+
+            if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
+              if (jobBundleFactory == null) {
 
 Review comment:
   Here we set jobBundleFactory across all Samza tasks, so they will share a 
single connection to sdk worker process. Shall we also consider the option to 
have jobBundleFactory for each task?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 180333)
    Time Spent: 1h 50m  (was: 1h 40m)

> initial support for portable api in samza runner
> ------------------------------------------------
>
>                 Key: BEAM-6271
>                 URL: https://issues.apache.org/jira/browse/BEAM-6271
>             Project: Beam
>          Issue Type: Task
>          Components: runner-samza
>            Reporter: Hai Lu
>            Assignee: Hai Lu
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to