This is an automated email from the ASF dual-hosted git repository. dchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new ad8af0c2d Fix restore factory not configured log (#1645) ad8af0c2d is described below commit ad8af0c2d431646c00a773c63e671d4560b573b3 Author: Daniel Chen <xrc...@uwaterloo.ca> AuthorDate: Thu Dec 15 10:46:12 2022 -0800 Fix restore factory not configured log (#1645) --- .../main/scala/org/apache/samza/storage/ContainerStorageManager.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index 326908535..e58b8c301 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -441,6 +441,11 @@ public class ContainerStorageManager { backendFactoryStoreNames.forEach((factoryName, storeNames) -> { StateBackendFactory factory = factories.get(factoryName); + if (factory == null) { + throw new SamzaException( + String.format("Required restore state backend factory: %s not found in configured factories %s", + factoryName, String.join(", ", factories.keySet()))); + } KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new KafkaChangelogRestoreParams(storeConsumers, inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), storageEngineFactories, serdes, taskInstanceCollectors.get(taskName));