This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 2a34cf2 Refactor LocalApplicationRunner (#1249)
2a34cf2 is described below
commit 2a34cf29d17f68c31d5d2212939ffe61a1138539
Author: Ke Wu <[email protected]>
AuthorDate: Fri Jan 10 14:06:55 2020 -0800
Refactor LocalApplicationRunner (#1249)
* Refactor LocalApplicationRunner
Issues: LocalApplicationRunner has multiple constructors and follows
different initialization logic.
Changes:
1. Refactor all constructors to end up with the same private constructor.
2. Align "static final" and "final static" to "static final"
3. Group class variales based on accessibility
4. Minor fixs on inconsistent styling, return etc.
Tests:
Unit Tests
* Revert LocalApplicationRunner(SamzaApplication app, Config config,
MetadataStoreFactory metadataStoreFactory) from package private back to public
---
.../samza/runtime/LocalApplicationRunner.java | 39 +++++++++++++---------
1 file changed, 23 insertions(+), 16 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index bf6dfce..9ad52b1 100644
---
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -76,9 +76,9 @@ public class LocalApplicationRunner implements
ApplicationRunner {
private static final Logger LOG =
LoggerFactory.getLogger(LocalApplicationRunner.class);
private static final String PROCESSOR_ID = UUID.randomUUID().toString();
- private final static String RUN_ID_METADATA_STORE =
"RunIdCoordinationStore";
+ private static final String RUN_ID_METADATA_STORE = "RunIdCoordinationStore";
private static final String METADATA_STORE_FACTORY_CONFIG =
"metadata.store.factory";
- public final static String DEFAULT_METADATA_STORE_FACTORY =
ZkMetadataStoreFactory.class.getName();
+ private static final String DEFAULT_METADATA_STORE_FACTORY =
ZkMetadataStoreFactory.class.getName();
private final ApplicationDescriptorImpl<? extends ApplicationDescriptor>
appDesc;
private final Set<Pair<StreamProcessor, MetadataStore>> processors =
ConcurrentHashMap.newKeySet();
@@ -88,9 +88,9 @@ public class LocalApplicationRunner implements
ApplicationRunner {
private final boolean isAppModeBatch;
private final Optional<CoordinationUtils> coordinationUtils;
private final Optional<MetadataStoreFactory> metadataStoreFactory;
+
private Optional<String> runId = Optional.empty();
private Optional<RunIdGenerator> runIdGenerator = Optional.empty();
-
private ApplicationStatus appStatus = ApplicationStatus.New;
/**
@@ -111,10 +111,7 @@ public class LocalApplicationRunner implements
ApplicationRunner {
* @param metadataStoreFactory the instance of {@link MetadataStoreFactory}
to read and write to coordinator stream.
*/
public LocalApplicationRunner(SamzaApplication app, Config config,
MetadataStoreFactory metadataStoreFactory) {
- this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
- this.isAppModeBatch = new ApplicationConfig(config).getAppMode() ==
ApplicationConfig.ApplicationMode.BATCH;
- this.coordinationUtils = getCoordinationUtils(config);
- this.metadataStoreFactory = Optional.ofNullable(metadataStoreFactory);
+ this(ApplicationDescriptorUtil.getAppDescriptor(app, config),
getCoordinationUtils(config), metadataStoreFactory);
}
/**
@@ -122,12 +119,20 @@ public class LocalApplicationRunner implements
ApplicationRunner {
*/
@VisibleForTesting
LocalApplicationRunner(ApplicationDescriptorImpl<? extends
ApplicationDescriptor> appDesc, Optional<CoordinationUtils> coordinationUtils) {
+ this(appDesc, coordinationUtils,
getDefaultCoordinatorStreamStoreFactory(new JobConfig(appDesc.getConfig())));
+ }
+
+ private LocalApplicationRunner(
+ ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
+ Optional<CoordinationUtils> coordinationUtils,
+ MetadataStoreFactory metadataStoreFactory) {
this.appDesc = appDesc;
- this.isAppModeBatch = new
ApplicationConfig(appDesc.getConfig()).getAppMode() ==
ApplicationConfig.ApplicationMode.BATCH;
+ this.isAppModeBatch = isAppModeBatch(appDesc.getConfig());
this.coordinationUtils = coordinationUtils;
- this.metadataStoreFactory =
Optional.ofNullable(getDefaultCoordinatorStreamStoreFactory(new
JobConfig(appDesc.getConfig())));
+ this.metadataStoreFactory = Optional.ofNullable(metadataStoreFactory);
}
+ @VisibleForTesting
static MetadataStoreFactory
getDefaultCoordinatorStreamStoreFactory(JobConfig jobConfig) {
String coordinatorSystemName = jobConfig.getCoordinatorSystemNameOrNull();
JobCoordinatorConfig jobCoordinatorConfig = new
JobCoordinatorConfig(jobConfig);
@@ -144,8 +149,8 @@ public class LocalApplicationRunner implements
ApplicationRunner {
return null;
}
- private Optional<CoordinationUtils> getCoordinationUtils(Config config) {
- if (!isAppModeBatch) {
+ private static Optional<CoordinationUtils> getCoordinationUtils(Config
config) {
+ if (!isAppModeBatch(config)) {
return Optional.empty();
}
JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
@@ -154,6 +159,10 @@ public class LocalApplicationRunner implements
ApplicationRunner {
return Optional.ofNullable(coordinationUtils);
}
+ private static boolean isAppModeBatch(Config config) {
+ return new ApplicationConfig(config).getAppMode() ==
ApplicationConfig.ApplicationMode.BATCH;
+ }
+
/**
* @return LocalJobPlanner created
*/
@@ -185,7 +194,7 @@ public class LocalApplicationRunner implements
ApplicationRunner {
runIdGenerator = Optional.of(new RunIdGenerator(coordinationUtils.get(),
metadataStore));
runId = runIdGenerator.flatMap(RunIdGenerator::getRunId);
} catch (Exception e) {
- LOG.warn("Failed to generate run id. Will continue execution without a
run id. Caused by {}", e);
+ LOG.warn("Failed to generate run id. Will continue execution without a
run id.", e);
}
}
@@ -278,7 +287,7 @@ public class LocalApplicationRunner implements
ApplicationRunner {
@VisibleForTesting
protected Set<StreamProcessor> getProcessors() {
- return processors.stream().map(sp ->
sp.getLeft()).collect(Collectors.toSet());
+ return processors.stream().map(Pair::getLeft).collect(Collectors.toSet());
}
@VisibleForTesting
@@ -340,10 +349,8 @@ public class LocalApplicationRunner implements
ApplicationRunner {
appDesc.getMetricsReporterFactories().forEach((name, factory) ->
reporters.put(name, factory.getMetricsReporter(name, processorId,
config)));
- StreamProcessor streamProcessor = new StreamProcessor(processorId, config,
reporters, taskFactory, appDesc.getApplicationContainerContextFactory(),
+ return new StreamProcessor(processorId, config, reporters, taskFactory,
appDesc.getApplicationContainerContextFactory(),
appDesc.getApplicationTaskContextFactory(), externalContextOptional,
listenerFactory, null, coordinatorStreamStore);
-
- return streamProcessor;
}
/**