shanthoosh commented on a change in pull request #1136: SAMZA-2298: Fix 
CoordinatorStreamStore creation for LocalApplicationRunner
URL: https://github.com/apache/samza/pull/1136#discussion_r317416728
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##########
 @@ -272,15 +284,49 @@ CountDownLatch getShutdownLatch() {
   }
 
   @VisibleForTesting
-  MetadataStore createCoordinatorStreamStore(Config jobConfig) {
+  MetadataStore createCoordinatorStreamStore(Config config) {
     if (metadataStoreFactory.isPresent()) {
-      MetadataStore coordinatorStreamStore =
-          metadataStoreFactory.get().getMetadataStore("NoOp", jobConfig, new 
MetricsRegistryMap());
-      return coordinatorStreamStore;
+      // TODO: Add missing metadata store abstraction for creating the 
underlying store to address SAMZA-2182
+      if (metadataStoreFactory.get() instanceof 
CoordinatorStreamMetadataStoreFactory) {
+        if (createUnderlyingCoordinatorStream(config)) {
+          MetadataStore coordinatorStreamStore =
+              metadataStoreFactory.get().getMetadataStore("NoOp", config, new 
MetricsRegistryMap());
+          LOG.info("Created coordinator stream store of type: {}", 
coordinatorStreamStore.getClass().getSimpleName());
+          return coordinatorStreamStore;
+        }
+      } else {
+        MetadataStore otherMetadataStore =
+            metadataStoreFactory.get().getMetadataStore("NoOp", config, new 
MetricsRegistryMap());
+        LOG.info("Created alternative coordinator stream store of type: {}", 
otherMetadataStore.getClass().getSimpleName());
+        return otherMetadataStore;
+      }
     }
+
+    LOG.warn("No coordinator stream store created.");
     return null;
   }
 
+  @VisibleForTesting
+  boolean createUnderlyingCoordinatorStream(Config config) {
+    // TODO: This work around method is necessary due to SAMZA-2182 - Metadata 
store: disconnect between creation and usage of the underlying storage
+    //  and will be addressed in the next phase of metadata store abstraction
+    if (new JobConfig(config).getCoordinatorSystemNameOrNull() == null) {
+      LOG.warn("{} or {} not configured. Coordinator stream not created.",
+          JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
+      return false;
+    }
+    SystemStream coordinatorSystemStream = 
CoordinatorStreamUtil.getCoordinatorSystemStream(config);
 
 Review comment:
   Just curious. 
   
    It looks like the coordinator-stream-creation control-flow is idempotent. 
Why can't we house these coordinator-stream creation logic  in the 
`CoordinatorStreamMetadataStore.init()`. `MetadataStore.init()` contract allows 
us to do that (and `ZkMetadataStore` implementation already does something 
similar). 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to