rmatharu commented on a change in pull request #1001: SAMZA-2168: Remove 
redundant SystemAdmin creation in ApplicationMaster
URL: https://github.com/apache/samza/pull/1001#discussion_r284033079
 
 

 ##########
 File path: 
samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -52,54 +53,62 @@ class ProcessJobFactory extends StreamJobFactory with 
Logging {
 
     val configFromCoordinatorStream: Config = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
 
-    val changelogStreamManager = new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE))
+    val systemAdmins = new SystemAdmins(configFromCoordinatorStream)
 
-    val coordinator = JobModelManager(configFromCoordinatorStream, 
changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, 
metricsRegistry)
-    val jobModel = coordinator.jobModel
+    try {
+      systemAdmins.start()
+      val changelogStreamManager = new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE))
 
-    val taskPartitionMappings: util.Map[TaskName, Integer] = new 
util.HashMap[TaskName, Integer]
-    for (containerModel <- jobModel.getContainers.values) {
-      for (taskModel <- containerModel.getTasks.values) {
-        taskPartitionMappings.put(taskModel.getTaskName, 
taskModel.getChangelogPartition.getPartitionId)
-      }
-    }
 
-    changelogStreamManager.writePartitionMapping(taskPartitionMappings)
+      val coordinator = JobModelManager(configFromCoordinatorStream, 
changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, 
metricsRegistry, systemAdmins)
+      val jobModel = coordinator.jobModel
 
-    //create necessary checkpoint and changelog streams
-    val checkpointManager = new 
TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
-    if (checkpointManager != null) {
-      checkpointManager.createResources()
-    }
-    ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, 
jobModel.maxChangeLogStreamPartitions)
-
-    val containerModel = coordinator.jobModel.getContainers.get(0)
+      val taskPartitionMappings: util.Map[TaskName, Integer] = new 
util.HashMap[TaskName, Integer]
+      for (containerModel <- jobModel.getContainers.values) {
+        for (taskModel <- containerModel.getTasks.values) {
+          taskPartitionMappings.put(taskModel.getTaskName, 
taskModel.getChangelogPartition.getPartitionId)
+        }
+      }
 
-    val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is 
configured
-    info("Process job. using fwkPath = " + fwkPath)
+      changelogStreamManager.writePartitionMapping(taskPartitionMappings)
 
-    val commandBuilder = {
-      config.getCommandClass match {
-        case Some(cmdBuilderClassName) => {
-          // A command class was specified, so we need to use a process job to
-          // execute the command in its own process.
-          Util.getObj(cmdBuilderClassName, classOf[CommandBuilder])
-        }
-        case _ => {
-          info("Defaulting to ShellCommandBuilder")
-          new ShellCommandBuilder
+      //create necessary checkpoint and changelog streams
+      val checkpointManager = new 
TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
+      if (checkpointManager != null) {
+        checkpointManager.createResources()
+      }
+      ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, 
jobModel.maxChangeLogStreamPartitions, systemAdmins)
 
 Review comment:
   We don't seem to have a test for this method and seems to be called from 
quite a few critical places.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to