This is an automated email from the ASF dual-hosted git repository. dchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 2688cc11e Fix perferomance bug with async commit (#1644) 2688cc11e is described below commit 2688cc11ea2cd4f5a91325a7161f869510e2a361 Author: shekhars-li <72765053+shekhars...@users.noreply.github.com> AuthorDate: Mon Nov 28 12:59:35 2022 -0800 Fix perferomance bug with async commit (#1644) --- .../src/main/java/org/apache/samza/storage/StateBackendFactory.java | 3 +++ .../org/apache/samza/storage/KafkaChangelogStateBackendFactory.java | 4 +++- .../apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java | 3 +++ .../src/main/scala/org/apache/samza/container/SamzaContainer.scala | 3 ++- 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java index 91f3df3d0..44b40b4dc 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java +++ b/samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java @@ -20,6 +20,7 @@ package org.apache.samza.storage; import java.io.File; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import org.apache.samza.config.Config; @@ -29,6 +30,7 @@ import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.SystemAdmin; import org.apache.samza.util.Clock; @@ -40,6 +42,7 @@ public interface StateBackendFactory { TaskBackupManager getBackupManager(JobContext jobContext, ContainerModel containerModel, TaskModel taskModel, + Map<String, SystemAdmin> systemNameSystemAdminMap, ExecutorService backupExecutor, MetricsRegistry taskInstanceMetricsRegistry, Config config, diff --git a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java index 0772449f4..9f3117156 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java +++ b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java @@ -40,6 +40,7 @@ import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.SSPMetadataCache; import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; @@ -66,13 +67,14 @@ public class KafkaChangelogStateBackendFactory implements StateBackendFactory { public TaskBackupManager getBackupManager(JobContext jobContext, ContainerModel containerModel, TaskModel taskModel, + Map<String, SystemAdmin> systemNameSystemAdminsMap, ExecutorService backupExecutor, MetricsRegistry metricsRegistry, Config config, Clock clock, File loggedStoreBaseDir, File nonLoggedStoreBaseDir) { - SystemAdmins systemAdmins = new SystemAdmins(config); + SystemAdmins systemAdmins = new SystemAdmins(systemNameSystemAdminsMap); StorageConfig storageConfig = new StorageConfig(config); Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs(); diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java index e2512b489..dc6059840 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java @@ -21,6 +21,7 @@ package org.apache.samza.storage.blobstore; import com.google.common.base.Preconditions; import java.io.File; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import org.apache.commons.lang3.StringUtils; @@ -41,6 +42,7 @@ import org.apache.samza.storage.TaskBackupManager; import org.apache.samza.storage.TaskRestoreManager; import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; +import org.apache.samza.system.SystemAdmin; import org.apache.samza.util.Clock; import org.apache.samza.util.ReflectionUtil; @@ -51,6 +53,7 @@ public class BlobStoreStateBackendFactory implements StateBackendFactory { JobContext jobContext, ContainerModel containerModel, TaskModel taskModel, + Map<String, SystemAdmin> systemNameSystemAdminsMap, ExecutorService backupExecutor, MetricsRegistry metricsRegistry, Config config, diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index bba782525..3d3847470 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -572,6 +572,7 @@ object SamzaContainer extends Logging { info ("Got task side input SSPs: %s" format taskSideInputSSPs) val taskBackupManagerMap = new util.HashMap[String, TaskBackupManager]() + val systemAdminsMap = systemAdmins.getSystemAdmins stateStorageBackendBackupFactories.asJava.forEach(new Consumer[StateBackendFactory] { override def accept(factory: StateBackendFactory): Unit = { val taskMetricsRegistry = @@ -579,7 +580,7 @@ object SamzaContainer extends Logging { taskInstanceMetrics.get(taskName).isDefined) taskInstanceMetrics.get(taskName).get.registry else new MetricsRegistryMap val taskBackupManager = factory.getBackupManager(jobContext, containerModel, - taskModel, commitThreadPool, taskMetricsRegistry, config, SystemClock.instance(), + taskModel, systemAdminsMap, commitThreadPool, taskMetricsRegistry, config, SystemClock.instance, loggedStorageBaseDir, nonLoggedStorageBaseDir) taskBackupManagerMap.put(factory.getClass.getName, taskBackupManager) }