This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a34cf2  Refactor LocalApplicationRunner (#1249)
2a34cf2 is described below

commit 2a34cf29d17f68c31d5d2212939ffe61a1138539
Author: Ke Wu <[email protected]>
AuthorDate: Fri Jan 10 14:06:55 2020 -0800

    Refactor LocalApplicationRunner (#1249)
    
    * Refactor LocalApplicationRunner
    
    Issues: LocalApplicationRunner has multiple constructors and follows 
different initialization logic.
    
    Changes:
    
    1. Refactor all constructors to end up with the same private constructor.
    2. Align "static final" and "final static" to "static final"
    3. Group class variales based on accessibility
    4. Minor fixs on inconsistent styling, return etc.
    
    Tests:
    Unit Tests
    
    * Revert LocalApplicationRunner(SamzaApplication app, Config config, 
MetadataStoreFactory metadataStoreFactory) from package private back to public
---
 .../samza/runtime/LocalApplicationRunner.java      | 39 +++++++++++++---------
 1 file changed, 23 insertions(+), 16 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index bf6dfce..9ad52b1 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -76,9 +76,9 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalApplicationRunner.class);
   private static final String PROCESSOR_ID = UUID.randomUUID().toString();
-  private final  static String RUN_ID_METADATA_STORE = 
"RunIdCoordinationStore";
+  private static final String RUN_ID_METADATA_STORE = "RunIdCoordinationStore";
   private static final String METADATA_STORE_FACTORY_CONFIG = 
"metadata.store.factory";
-  public final static String DEFAULT_METADATA_STORE_FACTORY = 
ZkMetadataStoreFactory.class.getName();
+  private static final String DEFAULT_METADATA_STORE_FACTORY = 
ZkMetadataStoreFactory.class.getName();
 
   private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> 
appDesc;
   private final Set<Pair<StreamProcessor, MetadataStore>> processors = 
ConcurrentHashMap.newKeySet();
@@ -88,9 +88,9 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
   private final boolean isAppModeBatch;
   private final Optional<CoordinationUtils> coordinationUtils;
   private final Optional<MetadataStoreFactory> metadataStoreFactory;
+
   private Optional<String> runId = Optional.empty();
   private Optional<RunIdGenerator> runIdGenerator = Optional.empty();
-
   private ApplicationStatus appStatus = ApplicationStatus.New;
 
   /**
@@ -111,10 +111,7 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
    * @param metadataStoreFactory the instance of {@link MetadataStoreFactory} 
to read and write to coordinator stream.
    */
   public LocalApplicationRunner(SamzaApplication app, Config config, 
MetadataStoreFactory metadataStoreFactory) {
-    this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
-    this.isAppModeBatch = new ApplicationConfig(config).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
-    this.coordinationUtils = getCoordinationUtils(config);
-    this.metadataStoreFactory = Optional.ofNullable(metadataStoreFactory);
+    this(ApplicationDescriptorUtil.getAppDescriptor(app, config), 
getCoordinationUtils(config), metadataStoreFactory);
   }
 
   /**
@@ -122,12 +119,20 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
    */
   @VisibleForTesting
   LocalApplicationRunner(ApplicationDescriptorImpl<? extends 
ApplicationDescriptor> appDesc, Optional<CoordinationUtils> coordinationUtils) {
+    this(appDesc, coordinationUtils, 
getDefaultCoordinatorStreamStoreFactory(new JobConfig(appDesc.getConfig())));
+  }
+
+  private LocalApplicationRunner(
+      ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
+      Optional<CoordinationUtils> coordinationUtils,
+      MetadataStoreFactory metadataStoreFactory) {
     this.appDesc = appDesc;
-    this.isAppModeBatch = new 
ApplicationConfig(appDesc.getConfig()).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+    this.isAppModeBatch = isAppModeBatch(appDesc.getConfig());
     this.coordinationUtils = coordinationUtils;
-    this.metadataStoreFactory = 
Optional.ofNullable(getDefaultCoordinatorStreamStoreFactory(new 
JobConfig(appDesc.getConfig())));
+    this.metadataStoreFactory = Optional.ofNullable(metadataStoreFactory);
   }
 
+  @VisibleForTesting
   static MetadataStoreFactory 
getDefaultCoordinatorStreamStoreFactory(JobConfig jobConfig) {
     String coordinatorSystemName = jobConfig.getCoordinatorSystemNameOrNull();
     JobCoordinatorConfig jobCoordinatorConfig = new 
JobCoordinatorConfig(jobConfig);
@@ -144,8 +149,8 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
     return null;
   }
 
-  private Optional<CoordinationUtils> getCoordinationUtils(Config config) {
-    if (!isAppModeBatch) {
+  private static Optional<CoordinationUtils> getCoordinationUtils(Config 
config) {
+    if (!isAppModeBatch(config)) {
       return Optional.empty();
     }
     JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
@@ -154,6 +159,10 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
     return Optional.ofNullable(coordinationUtils);
   }
 
+  private static boolean isAppModeBatch(Config config) {
+    return new ApplicationConfig(config).getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH;
+  }
+
   /**
    * @return LocalJobPlanner created
    */
@@ -185,7 +194,7 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
       runIdGenerator = Optional.of(new RunIdGenerator(coordinationUtils.get(), 
metadataStore));
       runId = runIdGenerator.flatMap(RunIdGenerator::getRunId);
     } catch (Exception e) {
-      LOG.warn("Failed to generate run id. Will continue execution without a 
run id. Caused by {}", e);
+      LOG.warn("Failed to generate run id. Will continue execution without a 
run id.", e);
     }
   }
 
@@ -278,7 +287,7 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
 
   @VisibleForTesting
   protected Set<StreamProcessor> getProcessors() {
-    return processors.stream().map(sp -> 
sp.getLeft()).collect(Collectors.toSet());
+    return processors.stream().map(Pair::getLeft).collect(Collectors.toSet());
   }
 
   @VisibleForTesting
@@ -340,10 +349,8 @@ public class LocalApplicationRunner implements 
ApplicationRunner {
     appDesc.getMetricsReporterFactories().forEach((name, factory) ->
         reporters.put(name, factory.getMetricsReporter(name, processorId, 
config)));
 
-    StreamProcessor streamProcessor = new StreamProcessor(processorId, config, 
reporters, taskFactory, appDesc.getApplicationContainerContextFactory(),
+    return new StreamProcessor(processorId, config, reporters, taskFactory, 
appDesc.getApplicationContainerContextFactory(),
           appDesc.getApplicationTaskContextFactory(), externalContextOptional, 
listenerFactory, null, coordinatorStreamStore);
-
-    return streamProcessor;
   }
 
   /**

Reply via email to