This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 540327e  [BEAM-7059] SamzaRunner: fix the job.id inconsistency in the 
new Samza version (#8279)
540327e is described below

commit 540327eab201ede710681cd07de8f8105a506730
Author: xinyuiscool <xinyuliu...@gmail.com>
AuthorDate: Thu Apr 11 18:29:32 2019 -0700

    [BEAM-7059] SamzaRunner: fix the job.id inconsistency in the new Samza 
version (#8279)
---
 .../beam/runners/samza/SamzaExecutionContext.java  | 33 ++++++++++++++++------
 .../org/apache/beam/runners/samza/SamzaRunner.java |  5 ++--
 .../samza/runtime/SamzaTimerInternalsFactory.java  | 10 ++++++-
 .../runners/samza/translation/ConfigBuilder.java   |  2 ++
 4 files changed, 38 insertions(+), 12 deletions(-)

diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
index af65135..0867e51 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
@@ -59,6 +59,7 @@ public class SamzaExecutionContext implements 
ApplicationContainerContext {
   private GrpcFnServer<GrpcDataService> fnDataServer;
   private GrpcFnServer<GrpcStateService> fnStateServer;
   private ControlClientPool controlClientPool;
+  private ExecutorService dataExecutor;
   private IdGenerator idGenerator = IdGenerators.incrementingLongs();
 
   public SamzaExecutionContext(SamzaPipelineOptions options) {
@@ -92,7 +93,7 @@ public class SamzaExecutionContext implements 
ApplicationContainerContext {
     if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
       try {
         controlClientPool = MapControlClientPool.create();
-        final ExecutorService dataExecutor = Executors.newCachedThreadPool();
+        dataExecutor = Executors.newCachedThreadPool();
 
         fnControlServer =
             GrpcFnServer.allocatePortAndCreateFor(
@@ -100,18 +101,23 @@ public class SamzaExecutionContext implements 
ApplicationContainerContext {
                     controlClientPool.getSink(), () -> SAMZA_WORKER_ID),
                 ServerFactory.createWithPortSupplier(
                     () -> 
SamzaRunnerOverrideConfigs.getFnControlPort(options)));
+        LOG.info("Started control server on port {}", 
fnControlServer.getServer().getPort());
 
         fnDataServer =
             GrpcFnServer.allocatePortAndCreateFor(
                 GrpcDataService.create(dataExecutor, 
OutboundObserverFactory.serverDirect()),
                 ServerFactory.createDefault());
+        LOG.info("Started data server on port {}", 
fnDataServer.getServer().getPort());
 
         fnStateServer =
             GrpcFnServer.allocatePortAndCreateFor(
                 GrpcStateService.create(), ServerFactory.createDefault());
+        LOG.info("Started state server on port {}", 
fnStateServer.getServer().getPort());
 
         final long waitTimeoutMs =
             SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options);
+        LOG.info("Control client wait timeout config: " + waitTimeoutMs);
+
         final InstructionRequestHandler instructionHandler =
             controlClientPool.getSource().take(SAMZA_WORKER_ID, 
Duration.ofMillis(waitTimeoutMs));
         final EnvironmentFactory environmentFactory =
@@ -120,6 +126,7 @@ public class SamzaExecutionContext implements 
ApplicationContainerContext {
         jobBundleFactory =
             SingleEnvironmentInstanceJobBundleFactory.create(
                 environmentFactory, fnDataServer, fnStateServer, idGenerator);
+        LOG.info("Started job bundle factory");
       } catch (Exception e) {
         throw new RuntimeException(
             "Running samza in Beam portable mode but failed to create job 
bundle factory", e);
@@ -131,19 +138,29 @@ public class SamzaExecutionContext implements 
ApplicationContainerContext {
 
   @Override
   public void stop() {
-    closeFnServer(fnControlServer);
+    closeAutoClosable(fnControlServer, "controlServer");
     fnControlServer = null;
-    closeFnServer(fnDataServer);
+    closeAutoClosable(fnDataServer, "dataServer");
     fnDataServer = null;
-    closeFnServer(fnStateServer);
+    closeAutoClosable(fnStateServer, "stateServer");
     fnStateServer = null;
+    if (dataExecutor != null) {
+      dataExecutor.shutdown();
+      dataExecutor = null;
+    }
+    controlClientPool = null;
+    closeAutoClosable(jobBundleFactory, "jobBundle");
+    jobBundleFactory = null;
   }
 
-  private void closeFnServer(GrpcFnServer<?> fnServer) {
-    try (AutoCloseable closer = fnServer) {
-      // do nothing
+  private static void closeAutoClosable(AutoCloseable closeable, String name) {
+    try (AutoCloseable closer = closeable) {
+      LOG.info("Closed {}", name);
     } catch (Exception e) {
-      LOG.error("Failed to close fn api servers. Ignore since this is shutdown 
process...", e);
+      LOG.error(
+          "Failed to close {}. Ignore since this is shutdown process...",
+          closeable.getClass().getSimpleName(),
+          e);
     }
   }
 
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
index 36d47a8..3a9e442 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
@@ -102,9 +102,7 @@ public class SamzaRunner extends 
PipelineRunner<SamzaPipelineResult> {
 
     pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Post-processed Beam pipeline:\n{}", 
PipelineDotRenderer.toDotString(pipeline));
-    }
+    LOG.info("Beam pipeline DOT graph:\n{}", 
PipelineDotRenderer.toDotString(pipeline));
 
     final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
 
@@ -141,6 +139,7 @@ public class SamzaRunner extends 
PipelineRunner<SamzaPipelineResult> {
         final MetricsReporter reporter = options.getMetricsReporters().get(i);
 
         reporters.put(name, (MetricsReporterFactory) (nm, processorId, config) 
-> reporter);
+        LOG.info(name + ": " + reporter.getClass().getName());
       }
       return reporters;
     } else {
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 146a9a8..4394675 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -344,13 +344,17 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
     private void loadEventTimeTimers() {
       if (!eventTimerTimerState.isEmpty().read()) {
         final Iterator<KeyedTimerData<K>> iter = 
eventTimerTimerState.readIterator().read();
-        for (int i = 0; i < timerBufferSize && iter.hasNext(); i++) {
+        int i = 0;
+        for (; i < timerBufferSize && iter.hasNext(); i++) {
           eventTimeTimers.add(iter.next());
         }
 
+        LOG.info("Loaded {} event time timers in memory", i);
+
         // manually close the iterator here
         final SamzaStoreStateInternals.KeyValueIteratorState iteratorState =
             (SamzaStoreStateInternals.KeyValueIteratorState) 
eventTimerTimerState;
+
         iteratorState.closeIterators();
       }
     }
@@ -359,11 +363,15 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
       if (!processingTimerTimerState.isEmpty().read()) {
         final Iterator<KeyedTimerData<K>> iter = 
processingTimerTimerState.readIterator().read();
         // since the iterator will reach to the end, it will be closed 
automatically
+        int count = 0;
         while (iter.hasNext()) {
           final KeyedTimerData<K> keyedTimerData = iter.next();
           timerRegistry.schedule(
               keyedTimerData, 
keyedTimerData.getTimerData().getTimestamp().getMillis());
+          ++count;
         }
+
+        LOG.info("Loaded {} processing time timers in memory", count);
       }
     }
 
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
index 42a0d97..975baa2 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
@@ -82,6 +82,8 @@ public class ConfigBuilder {
       // apply user configs
       config.putAll(createUserConfig(options));
 
+      config.put(ApplicationConfig.APP_NAME, options.getJobName());
+      config.put(ApplicationConfig.APP_ID, options.getJobInstance());
       config.put(JobConfig.JOB_NAME(), options.getJobName());
       config.put(JobConfig.JOB_ID(), options.getJobInstance());
 

Reply via email to