dnishimura commented on a change in pull request #1136: SAMZA-2298: Fix 
CoordinatorStreamStore creation for LocalApplicationRunner
URL: https://github.com/apache/samza/pull/1136#discussion_r317417879
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
 ##########
 @@ -272,15 +284,49 @@ CountDownLatch getShutdownLatch() {
   }
 
   @VisibleForTesting
-  MetadataStore createCoordinatorStreamStore(Config jobConfig) {
+  MetadataStore createCoordinatorStreamStore(Config config) {
     if (metadataStoreFactory.isPresent()) {
-      MetadataStore coordinatorStreamStore =
-          metadataStoreFactory.get().getMetadataStore("NoOp", jobConfig, new 
MetricsRegistryMap());
-      return coordinatorStreamStore;
+      // TODO: Add missing metadata store abstraction for creating the 
underlying store to address SAMZA-2182
+      if (metadataStoreFactory.get() instanceof 
CoordinatorStreamMetadataStoreFactory) {
+        if (createUnderlyingCoordinatorStream(config)) {
+          MetadataStore coordinatorStreamStore =
+              metadataStoreFactory.get().getMetadataStore("NoOp", config, new 
MetricsRegistryMap());
+          LOG.info("Created coordinator stream store of type: {}", 
coordinatorStreamStore.getClass().getSimpleName());
+          return coordinatorStreamStore;
+        }
+      } else {
+        MetadataStore otherMetadataStore =
+            metadataStoreFactory.get().getMetadataStore("NoOp", config, new 
MetricsRegistryMap());
+        LOG.info("Created alternative coordinator stream store of type: {}", 
otherMetadataStore.getClass().getSimpleName());
+        return otherMetadataStore;
+      }
     }
+
+    LOG.warn("No coordinator stream store created.");
     return null;
   }
 
+  @VisibleForTesting
+  boolean createUnderlyingCoordinatorStream(Config config) {
 
 Review comment:
   Not true. Coordinator streams were introduced to Standalone only recently. 
In the case of passthrough coordinator jobs, they may not have a default system 
or `job.coordinator.system` defined and therefore cannot create a coordinator 
stream.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to