[ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186669&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186669
 ]

ASF GitHub Bot logged work on BEAM-6271:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jan/19 02:14
            Start Date: 18/Jan/19 02:14
    Worklog Time Spent: 10m 
      Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r248908054
 
 

 ##########
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
 ##########
 @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) {
     this.options = options;
   }
 
+  private void closeFnServer(GrpcFnServer<?> fnServer) {
+    try (AutoCloseable closer = fnServer) {
+      // do nothing
+    } catch (Exception e) {
+      LOG.error("Failed to close fn api servers. Ignore since this is shutdown 
process...", e);
+    }
+  }
+
+  private void setUpContextManager(
+      StreamGraph streamGraph, SamzaExecutionContext executionContext) {
+    streamGraph.withContextManager(
+        new ContextManager() {
+          @Override
+          public void init(Config config, TaskContext context) {
+            if (executionContext.getMetricsContainer() == null) {
+              final MetricsRegistryMap metricsRegistry =
+                  (MetricsRegistryMap) 
context.getSamzaContainerContext().metricsRegistry;
+              executionContext.setMetricsContainer(new 
SamzaMetricsContainer(metricsRegistry));
+            }
+
+            if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
+              if (jobBundleFactory == null) {
+                try {
+                  final long waitTimeoutMs =
+                      
SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options);
+                  final InstructionRequestHandler instructionHandler =
+                      controlClientPool
+                          .getSource()
+                          .take(SAMZA_WORKER_ID, 
Duration.ofMillis(waitTimeoutMs));
+                  final EnvironmentFactory environmentFactory =
+                      environment -> RemoteEnvironment.forHandler(environment, 
instructionHandler);
+                  // TODO: use 
JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping
+                  jobBundleFactory =
+                      SingleEnvironmentInstanceJobBundleFactory.create(
+                          environmentFactory, fnDataServer, fnStateServer);
+                } catch (Exception e) {
+                  throw new RuntimeException(
+                      "Running samza in Beam portable mode but failed to 
create job bundle factory",
+                      e);
+                }
+                executionContext.setJobBundleFactory(jobBundleFactory);
+              }
+            }
+
+            context.setUserContext(executionContext);
+          }
+
+          @Override
+          public void close() {
+            closeFnServer(fnControlServer);
+            fnControlServer = null;
+            closeFnServer(fnDataServer);
+            fnDataServer = null;
+            closeFnServer(fnStateServer);
+            fnStateServer = null;
+          }
+        });
+  }
+
+  private void setUpFnApiServer() {
+    controlClientPool = MapControlClientPool.create();
 
 Review comment:
   the pool doesn't have a way to close. So setting the variable to null is the 
best we could do. Do need to shutdown the executorService though. Good point
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 186669)
    Time Spent: 4h  (was: 3h 50m)

> initial support for portable api in samza runner
> ------------------------------------------------
>
>                 Key: BEAM-6271
>                 URL: https://issues.apache.org/jira/browse/BEAM-6271
>             Project: Beam
>          Issue Type: Task
>          Components: runner-samza
>            Reporter: Hai Lu
>            Assignee: Hai Lu
>            Priority: Major
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to