kw2542 commented on a change in pull request #1354:
URL: https://github.com/apache/samza/pull/1354#discussion_r419785277



##########
File path: 
samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
##########
@@ -110,13 +111,34 @@ object CoordinatorStreamUtil extends Logging {
       jobConfig.getJobId)
   }
 
+  /**
+   * Reads and returns launch config persisted in coordinator stream. Only 
job.auto sizing configs are currently supported.
+   * @param config full job config
+   * @param metadataStore an instance of the instantiated MetadataStore
+   * @return empty config if auto sizing is disabled, otherwise auto sizing 
related configs.
+   */
+  def readLaunchConfigFromCoordinatorStream(config: Config, metadataStore: 
MetadataStore): Config = {
+    if (!config.getBoolean(JobConfig.JOB_AUTOSIZING_ENABLED, false)) {
+      new MapConfig()
+    } else {
+      val config = readConfigFromCoordinatorStream(metadataStore)
+      val launchConfig: util.Map[String, String] = new util.HashMap[String, 
String]()
+      for ((key:String, value:String) <- config.asScala) {
+        if (key.startsWith(JobConfig.JOB_AUTOSIZING_CONFIG_PREFIX)) {

Review comment:
       This is great!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to