This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new ad8af0c2d Fix restore factory not configured log (#1645)
ad8af0c2d is described below

commit ad8af0c2d431646c00a773c63e671d4560b573b3
Author: Daniel Chen <xrc...@uwaterloo.ca>
AuthorDate: Thu Dec 15 10:46:12 2022 -0800

    Fix restore factory not configured log (#1645)
---
 .../main/scala/org/apache/samza/storage/ContainerStorageManager.java | 5 +++++
 1 file changed, 5 insertions(+)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 326908535..e58b8c301 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -441,6 +441,11 @@ public class ContainerStorageManager {
 
     backendFactoryStoreNames.forEach((factoryName, storeNames) -> {
       StateBackendFactory factory = factories.get(factoryName);
+      if (factory == null) {
+        throw new SamzaException(
+            String.format("Required restore state backend factory: %s not 
found in configured factories %s",
+                factoryName, String.join(", ", factories.keySet())));
+      }
       KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new 
KafkaChangelogRestoreParams(storeConsumers,
           inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), 
storageEngineFactories, serdes,
           taskInstanceCollectors.get(taskName));

Reply via email to