shanthoosh commented on a change in pull request #1136: SAMZA-2298: Fix
CoordinatorStreamStore creation for LocalApplicationRunner
URL: https://github.com/apache/samza/pull/1136#discussion_r317416728
##########
File path:
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
##########
@@ -272,15 +284,49 @@ CountDownLatch getShutdownLatch() {
}
@VisibleForTesting
- MetadataStore createCoordinatorStreamStore(Config jobConfig) {
+ MetadataStore createCoordinatorStreamStore(Config config) {
if (metadataStoreFactory.isPresent()) {
- MetadataStore coordinatorStreamStore =
- metadataStoreFactory.get().getMetadataStore("NoOp", jobConfig, new
MetricsRegistryMap());
- return coordinatorStreamStore;
+ // TODO: Add missing metadata store abstraction for creating the
underlying store to address SAMZA-2182
+ if (metadataStoreFactory.get() instanceof
CoordinatorStreamMetadataStoreFactory) {
+ if (createUnderlyingCoordinatorStream(config)) {
+ MetadataStore coordinatorStreamStore =
+ metadataStoreFactory.get().getMetadataStore("NoOp", config, new
MetricsRegistryMap());
+ LOG.info("Created coordinator stream store of type: {}",
coordinatorStreamStore.getClass().getSimpleName());
+ return coordinatorStreamStore;
+ }
+ } else {
+ MetadataStore otherMetadataStore =
+ metadataStoreFactory.get().getMetadataStore("NoOp", config, new
MetricsRegistryMap());
+ LOG.info("Created alternative coordinator stream store of type: {}",
otherMetadataStore.getClass().getSimpleName());
+ return otherMetadataStore;
+ }
}
+
+ LOG.warn("No coordinator stream store created.");
return null;
}
+ @VisibleForTesting
+ boolean createUnderlyingCoordinatorStream(Config config) {
+ // TODO: This work around method is necessary due to SAMZA-2182 - Metadata
store: disconnect between creation and usage of the underlying storage
+ // and will be addressed in the next phase of metadata store abstraction
+ if (new JobConfig(config).getCoordinatorSystemNameOrNull() == null) {
+ LOG.warn("{} or {} not configured. Coordinator stream not created.",
+ JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
+ return false;
+ }
+ SystemStream coordinatorSystemStream =
CoordinatorStreamUtil.getCoordinatorSystemStream(config);
Review comment:
Just curious.
It looks like the coordinator-stream-creation control-flow is idempotent.
Why can't we house these coordinator-stream creation logic in the
`CoordinatorStreamMetadataStore.init()`. `MetadataStore.init()` contract allows
us to do that (and `ZkMetadataStore` implementation already does something
similar).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services