This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 2688cc11e Fix perferomance bug with async commit (#1644)
2688cc11e is described below

commit 2688cc11ea2cd4f5a91325a7161f869510e2a361
Author: shekhars-li <72765053+shekhars...@users.noreply.github.com>
AuthorDate: Mon Nov 28 12:59:35 2022 -0800

    Fix perferomance bug with async commit (#1644)
---
 .../src/main/java/org/apache/samza/storage/StateBackendFactory.java   | 3 +++
 .../org/apache/samza/storage/KafkaChangelogStateBackendFactory.java   | 4 +++-
 .../apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java  | 3 +++
 .../src/main/scala/org/apache/samza/container/SamzaContainer.scala    | 3 ++-
 4 files changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java 
b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
index 91f3df3d0..44b40b4dc 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
@@ -20,6 +20,7 @@
 package org.apache.samza.storage;
 
 import java.io.File;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import org.apache.samza.config.Config;
@@ -29,6 +30,7 @@ import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.util.Clock;
 
 
@@ -40,6 +42,7 @@ public interface StateBackendFactory {
   TaskBackupManager getBackupManager(JobContext jobContext,
       ContainerModel containerModel,
       TaskModel taskModel,
+      Map<String, SystemAdmin> systemNameSystemAdminMap,
       ExecutorService backupExecutor,
       MetricsRegistry taskInstanceMetricsRegistry,
       Config config,
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
 
b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
index 0772449f4..9f3117156 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
@@ -40,6 +40,7 @@ import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SSPMetadataCache;
 import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
@@ -66,13 +67,14 @@ public class KafkaChangelogStateBackendFactory implements 
StateBackendFactory {
   public TaskBackupManager getBackupManager(JobContext jobContext,
       ContainerModel containerModel,
       TaskModel taskModel,
+      Map<String, SystemAdmin> systemNameSystemAdminsMap,
       ExecutorService backupExecutor,
       MetricsRegistry metricsRegistry,
       Config config,
       Clock clock,
       File loggedStoreBaseDir,
       File nonLoggedStoreBaseDir) {
-    SystemAdmins systemAdmins = new SystemAdmins(config);
+    SystemAdmins systemAdmins = new SystemAdmins(systemNameSystemAdminsMap);
     StorageConfig storageConfig = new StorageConfig(config);
     Map<String, SystemStream> storeChangelogs = 
storageConfig.getStoreChangelogs();
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
index e2512b489..dc6059840 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
@@ -21,6 +21,7 @@ package org.apache.samza.storage.blobstore;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import org.apache.commons.lang3.StringUtils;
@@ -41,6 +42,7 @@ import org.apache.samza.storage.TaskBackupManager;
 import org.apache.samza.storage.TaskRestoreManager;
 import 
org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
 import 
org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
+import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.ReflectionUtil;
 
@@ -51,6 +53,7 @@ public class BlobStoreStateBackendFactory implements 
StateBackendFactory {
       JobContext jobContext,
       ContainerModel containerModel,
       TaskModel taskModel,
+      Map<String, SystemAdmin> systemNameSystemAdminsMap,
       ExecutorService backupExecutor,
       MetricsRegistry metricsRegistry,
       Config config,
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index bba782525..3d3847470 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -572,6 +572,7 @@ object SamzaContainer extends Logging {
       info ("Got task side input SSPs: %s" format taskSideInputSSPs)
 
       val taskBackupManagerMap = new util.HashMap[String, TaskBackupManager]()
+      val systemAdminsMap = systemAdmins.getSystemAdmins
       stateStorageBackendBackupFactories.asJava.forEach(new 
Consumer[StateBackendFactory] {
         override def accept(factory: StateBackendFactory): Unit = {
           val taskMetricsRegistry =
@@ -579,7 +580,7 @@ object SamzaContainer extends Logging {
               taskInstanceMetrics.get(taskName).isDefined) 
taskInstanceMetrics.get(taskName).get.registry
             else new MetricsRegistryMap
           val taskBackupManager = factory.getBackupManager(jobContext, 
containerModel,
-            taskModel, commitThreadPool, taskMetricsRegistry, config, 
SystemClock.instance(),
+            taskModel, systemAdminsMap, commitThreadPool, taskMetricsRegistry, 
config, SystemClock.instance,
             loggedStorageBaseDir, nonLoggedStorageBaseDir)
           taskBackupManagerMap.put(factory.getClass.getName, taskBackupManager)
         }

Reply via email to